- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
对于 TPL 数据流中 BroadcastCopyBlock
的以下实现,我很高兴提供一些输入,它将接收到的消息复制给所有注册到 BroadcastCopyBlock
的消费者,并且保证传递给所有消费者,这些消费者在收到消息时链接到 block 。 (与不保证消息传递的 BroadcastBlock
不同,如果下一个消息进来,则在前一个消息已传递给所有消费者之前)。
我主要关心的是消息的保留和保留的释放。如果接收 block 决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将无限期保留。我在想,我应该以某种方式将消息标记为未使用,但我不确定如何标记。我在考虑一些人工消息接收器(没有任何操作的 ActionBlock
),或者我可以将消息标记为已丢弃吗?
我们也欢迎您提供有关实现的进一步意见。
这可能几乎是以下问题的重复,但我更愿意使用我自己的类,而不是创建 block 的方法。或者这会被认为是糟糕的风格吗?
BroadcastBlock with Guaranteed Delivery in TPL Dataflow
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();
public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));
In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}
/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;
var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}
private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}
/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}
/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}
/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}
/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}
/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}
最佳答案
TL/DR
您的实现使用 Post
内部的 ActionBlock
方法,如果目标拒绝消息,它仍然会丢失数据,切换到 SendAsync
一个,并且,您可能不需要实现所有这些方法,您只需要 ITargetBlock<in TInput>
接口(interface)实现。
在回到您的主要问题之前,我想澄清一些事情。我认为您对 TPL Dataflow
库中的一些选项感到困惑,我想在这里稍微解释一下。您所说的 The first consumer, which receives the message, deletes it from the queue
行为与 BroadcastBlock
无关,它与为 ISourceBlock
链接的多个消费者有关,例如 BufferBlock
:
var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);
BroadcastBlock
所做的正是您所说的,请考虑以下代码:
private static void UnboundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Unbounded Block: {i}");
});
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
但是,这只能在传入数据的速度小于处理数据的速度时才能完成,因为在其他情况下,您的内存会因为缓冲区增长而很快结束,正如您在问题中所述。让我们看看如果我们使用 ExecutionDataflowBlockOptions
来限制慢 block 的传入数据缓冲区会发生什么:
private static void BoundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Bounded Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1
如您所见,我们的慢 block 丢失了最后一条消息,这不是我们要找的。原因是默认情况下 BroadcastBlock
使用 Post
方法传递消息。根据 official Intro Document :
- Post
- An extension method that asynchronously posts to the target block. It returns immediately whether the data could be accepted or not, and it does not allow for the target to consume the message at a later time.
- SendAsync
- An extension method that asynchronously sends to target blocks while supporting buffering. A
Post
operation on a target is asynchronous, but if a target wants to postpone the offered data, there is nowhere for the data to be buffered and the target must instead be forced to decline.SendAsync
enables asynchronous posting of the data with buffering, such that if a target postpones, it will later be able to retrieve the postponed data from the temporary buffer used for this one asynchronously posted message.
因此,这种方法可以帮助我们完成任务,让我们介绍一些包装器 ActionBlock
,它完全符合我们的要求 - SendAsync
用于我们的真实处理器的数据:
private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
但是这种等待永远不会结束——我们的基本包装器不会传播链接 block 的完成,并且 ActionBlock
不能链接到任何东西。我们可以尝试等待包装完成:
private static void BoundedWrapperFiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW finite Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}
输出将是
FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0
这绝对不是我们想要的 - ActionBlock
完成了所有工作,并且不会等待最后一条消息的发布。此外,我们甚至没有看到第二条消息,因为我们在 Sleep
方法结束之前就退出了该方法!因此,您肯定需要自己的实现。
现在,最后,关于您的代码的一些想法:
ITargetBlock<in TInput>
,因此只需实现该接口(interface)。Post
中的 ActionBlock
方法,如我们所见,如果消费者方面出现某些问题,这可能会导致数据丢失。请考虑使用 SendAsync
方法。Completion
任务的实现实际上颠倒了数据流的顺序 - 您正在等待目标完成,我认为这不是好的做法 - 您可能应该为数据流创建一个结束 block (这可能是甚至 NullTarget
block ,它只是同步地丢弃传入的消息),并等待它完成。关于c# - 用于 TPL 数据流的 BroadcastCopyBlock 保证交付,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40605301/
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎不是关于 a specific programming problem, a softwar
有没有办法保证您的系统托盘图标被删除? 添加系统托盘图标: Shell_NotifyIcon(NIM_ADD, &m_tnd); 删除系统托盘图标: Shell_NotifyIcon(NIM_DELE
是否保证(-x) % m,其中x和m在c++中为正standard (c++0x) 为负数,等于 -(x % m)? 我知道它在我知道的所有机器上都是正确的。 最佳答案 除了Luchian的回答,这是
可能还有其他方法可以作为示例,但这不是我要问的重点。 我正在这样做: (future (clojure.java.shell/sh "sleep" "3" :dir "/tmp")) 启动对Shell
可以使用 XREAD(或者可能是另一个命令)以原子方式检测数据是否写入 Redis 流? 进一步来说: 假设您在一个进程中将一些数据添加到 Redis 流中,并看到数据已通过某个自动生成的 key 成
Kotlin 协程是否提供任何“发生之前”保证? 例如,在这种情况下,写入 mutableVar 和随后在(可能)其他线程上读取之间是否存在“发生之前”保证: suspend fun doSometh
我正在开发一个跟踪行程的应用程序。在搜索了这件事之后,我得出结论,实现这一点(持续跟踪用户的位置)的最好方法是使用前台服务。在某些情况下工作得很好,但在其他一些情况下(即使关闭 DOZE),我得到一些
我正在使用 ORM (sqlalchemy) 从 PG 数据库中获取数据。我想避免在我手工编写的 SQL 语句中指定所有表列名称*。 到目前为止,我的假设是返回的列按照用于创建数据库表的 DDL 语句
在 setState 的文档中这样说: setState() does not immediately mutate this.state but creates a pending state tr
我有一个与不同硬件接口(interface)的简单应用程序。对于每个硬件,我针对一个独特的监视器函数生成了一个 pthread_t,总共有 6 个线程:1 个管理线程和 5 个工作线程。 每个线程都有
目前,我有 private ThreadLocal shortDateFormat = new ThreadLocal() { @Override protected DateFormat i
我有一个使用 SolrCloud 将文档写入 Solr 的 Java 作业。输入数据被转换为不同实体的映射,然后将每个实体写入与其实体类型对应的 Solr 集合。 我的代码如下: public voi
我们使用嵌入式设备通过串行到以太网转换器将数据包从串行端口发送到服务器。我们使用的一家制造商 Moxa 将始终以与构建它们相同的方式发送数据包。意思是,如果我们构建一个大小为 255 的数据包,它将始
我是从 C++ 转到 Java 的。在 C++ 世界中,我们关注异常安全,并注意到变元器可以在变元器本身或其委托(delegate)的方法抛出异常时提供不同的保证(最小、强、不抛出)。实现具有强异常保
我想将来自 SAAJ 的 SOAPConnectionFactory 和 MessageFactory 类与多个线程一起使用,但事实证明我不能假设它们是线程安全的。一些相关的帖子: javax.xml
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 5 年前。 Improve
关于正确性,我找不到以下代码片段没有设计缺陷的证据/反证据。 template class MyDirtyPool { public: template std::size_t ad
对于这个问题,我找到了不同的答案,我知道一定有一个确定的答案。 C 中四种主要数据类型的最小分配内存大小是多少? int , double , float , 和 char是我在想什么。做 signe
我正在使用 Kafka Producer,我的应用程序将具有相同键的各个 ProducerRecords 发送到单个分区中,然后这些 ProducerRecords 在发送到代理之前进行批处理(使用
您好,我是服务器端编程 (java) 的新手,正在阅读 SendRedirect 与 Forward 之间的区别。来自 Post-redirect-get pattern它解释说这将阻止通过点击刷新按
我是一名优秀的程序员,十分优秀!