- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我一直在寻找一种轻量级的,正在处理的异步消息总线,并且遇到了TPL Dataflow。
我当前的实现如下(在https://gist.github.com/4416655上的完整示例)。
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
BroadcastBlock<T>
同时将消息发送到多个处理程序的来源?这是我根据this post得出的结论。 BroadcastBlock<T>
实例。处理大量邮件时,这可能会引起问题吗?是否应该为每种消息类型创建一个单独的实例? BroadcastBlock<T>
始终存储最后发送的项目。这意味着任何新的订阅(链接)将自动传递给该消息。可以更改此行为(新订阅应仅接收新消息)。 // Subscribe to Message type
var subscription1 = bus.Subscribe<Message>(async m => {
await Task.Delay(2000);
Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
});
MaxDegreeOfParallelism = 1
没什么区别)。 SendAsync
允许我等待消息的发送,但不允许我等待目标的完成(ActionBlock<T>
)。我以为PropagateCompletion
会这样做,但事实并非如此。理想情况下,我想知道消息的所有处理程序何时执行。 Task.Delay
获得预期的行为的原因是,这延迟了每个处理程序的执行,而不是所有处理程序的处理。
Thread.Sleep
是我所需要的。
最佳答案
在回答了您的问题之后(见下文),我意识到使用TPL Dataflow块对您的设计进行建模可能不是一个好主意。 TDF非常适合通过很大程度上独立的块来处理消息,而没有内置的跟踪单个消息的方式。但这就是您想要的:按处理程序顺序处理消息,并跟踪每条消息的完成情况。
因此,我认为您不应创建整个数据流网络,而应使用单个ActionBlock
作为异步消息处理器:
public class Bus
{
class Subscription
{
public Guid Id { get; private set; }
public Func<object, Task> HandlerAction { get; private set; }
public Subscription(Guid id, Func<object, Task> handlerAction)
{
Id = id;
HandlerAction = handlerAction;
}
}
private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();
private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;
public Bus()
{
// subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
var subscriptions = new List<Subscription>();
m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
async tuple =>
{
var message = tuple.Item1;
var completedAction = tuple.Item2;
// could be made more efficient, probably doesn't matter
Guid idToUnsubscribe;
while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
{
subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
}
Subscription handlerToSubscribe;
while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
{
subscriptions.Add(handlerToSubscribe);
}
foreach (var subscription in subscriptions)
{
await subscription.HandlerAction(message);
}
completedAction();
});
}
public Task SendAsync<TMessage>(TMessage message)
{
var tcs = new TaskCompletionSource<bool>();
Action completedAction = () => tcs.SetResult(true);
m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));
return tcs.Task;
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
return Subscribe<TMessage>(
message =>
{
handlerAction(message);
// we need a completed non-generic Task; this is a simple, efficient way to get it
// another option would be to use async lambda with no await,
// but that's less efficient and produces a warning
return Task.FromResult(false);
});
}
public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
{
Func<object, Task> actionWithCheck = async message =>
{
if (message is TMessage)
await handlerAction((TMessage)message);
};
var id = Guid.NewGuid();
m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
return id;
}
public void Unsubscribe(Guid subscriptionId)
{
m_idsToUnsubscribe.Enqueue(subscriptionId);
}
}
Is
BroadcastBlock<T>
the recommended source for sending messages to multiple handlers concurrently?
BroadcastBlock<T>
是您想要的。当然,TPL Dataflow中没有直接相似的块。
In my implementation I'm using a single BroadcastBlock instance for all message types. Could this cause problems when handling large numbers of messages? Should I create a separate instance per message type?
BroadcastBlock<T>
always stores the last item that was sent. This means that any new subscriptions (links) will automatically be passed this message. Is is possible to change this behaviour (new subscriptions should only receive new messages)?
BroadcastBlock<T>
来做到这一点。如果不需要
BroadcastBlock<T>
的所有功能(发送到具有有限容量的块,这些块可能暂时已满,支持将非贪婪的块作为目标),则可能需要编写自定义版本的
BroadcastBlock<T>
来执行此操作。
When sending a message I expected to see each message output to the console one by one, with 2s increments. Instead, after 2s all the messages were output at once. I'm assuming this is due to the parallelism performed by the underlying scheduler but am curious how I can alter these settings (setting
MaxDegreeOfParallelism = 1
made no difference).
ActionBlock<T>
可能不是最佳解决方案。实际上,TDF可能根本不是最好的解决方案。
Subscribe()
接受
Action<TMessage>
,这意味着您的lambda将被编译为
async void
方法。这些仅应在没有其他选择的特定(且相对罕见)的情况下使用。如果要支持
async
处理程序,则应接受
async Task
方法,即
Func<TMessage, Task>
。
The reason I was not getting the expected behaviour with
Task.Delay
is that this was delaying the execution of each handler, not the processing of all handlers.Thread.Sleep
was what I needed.
Thread.Sleep()
违反了整个异步的想法,如果可能的话,您不应该使用它。另外,我认为它实际上并没有按照您想要的方式工作:它为每个线程引入了延迟,但是TPL Dataflow将使用多个线程,因此这不会达到您的预期。
Finally, whilst
SendAsync
allows me to await the sending of a message, it doesn't allow me to await on the completion of the target (theActionBlock<T>
). I thought that this is whatPropagateCompletion
would do but that does not seem to be the case. Ideally I'd like to know when all handlers for the message have executed.
PropagateCompletion
以及
Complete()
和
Completion
用于处理整个块,而不是处理单个消息。造成这种情况的一个原因是数据流网络更加复杂,可能尚不清楚何时确切地处理了一条消息。例如,如果一条消息已经发送到
BroadcastBlock<T>
的所有当前目标,但是也将发送到所有新添加的目标,是否应该认为它是完整的?
TaskCompletionSource
手动进行某种方式。
关于.net - 使用TPL Dataflow创建消息总线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14096614/
目录 总线是什么? 常见总线类型有哪些? 总线的串行和并行的区别? 数据总线 地址总线
从下面的代码我在 map 上添加标记,每 15 秒刷新一次并从数据库中获取新的纬度和经度。标记(巴士图像)已成功添加到 map 上并从一个位置平稳移动到另一个位置,就像汽车在路上行驶一样。现在我想要的
如果在小端处理器上运行的程序将未缓存的值 0xaabbccdd 写入地址 0,并且处理器使用 32 位宽的 AXI4 总线,那么 WDATA 的第 31-24 位是 0xaa 还是 0xdd? AXI
是否存在用于将进程内服务消息传递给另一个服务的扭曲机制?我写了一个原型(prototype)总线,看起来像 from collections import defaultdict ch
我修改了设备树文件并使用 4 个 GPIO 引脚启用了 spi,这些引脚支持 pinmux 并从 gpio 切换到 spi 功能。但是在 Linux 内核代码中,代码如何知道使用了哪个 spi 总线/
我正在使用控创嵌入式计算机通过 I2C 与 ST 微 Controller 通信。我正在使用开发适配器与 I2C 接口(interface),使用描述的简单 read() 和 write() 函数 h
我有一个需要 PEC 的 I2C/SMBus 设备我正在为它编写一个内核空间驱动程序。 在 Linux 2.6.37 上我使用 i2c_board_info实例化客户端并在那里设置标志,但现在驱动程序
我想确认我的消息已经通过 socketCAN 库保存在 CAN 总线上。socketCAN 文档描述了使用 recvmsg() 函数时的这种可能性,我对其实现有疑问。 我要实现的功能是确认我的消息在仲
下面是我的代码 #import #import int main(int argc, const char *argv[]) { char *str = "First string";
在大量使用 D-Bus 的应用程序中获得更好的时间性能有哪些好的做法? ? 以下是我们的团队通过硬敲学校学到的一些知识: 尝试将数据实体组合成一个单一的大型结构/对象,以通过 D-Bus IPC 发送
我正在Akka内核下运行Akka应用程序,该程序在其他系统上也可以正常工作。 akka { loggers = ["akka.event.slf4j.Slf4jLogger"] log
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
我是 C 语言新手,对于家庭作业考试,我必须实现一个简单的服务器套接字程序,该程序在循环中发送一些数据,并且如果客户端连接到服务器套接字(已使用 Arduino 完成,但需要相同的功能)在 raspb
我正在构建一个由许多(> 100)个相同节点组成的系统,所有节点均通过 CAN 总线连接。这个想法是所有节点必须具有相同的信息,任何节点都可以生成事件并通过 CAN 广播它。对于这些事件,CAN 帧提
您好,我在解析 IIB Toolkit 中的任何 JSON 时遇到问题。 java计算节点抛出的异常为:java.lang.NoClassDefFoundError: org.json.JSONObj
我买了这个传感器: http://dlnmh9ip6v2uc.cloudfront.net/datasheets/Sensors/Weather/RHT03.pdf 输出为“MaxDetect 1-w
我正在用户空间编写包装器 API,用于在嵌入式 Linux 平台上用 C 语言控制 I2C 总线。我能够使用 read() 和 write() 方法以及 ioctl() 调用来选择从属设备,从而从传感
在我的软件(用 C++ 编写)中,我使用 Linux 标准函数打开 CAN 总线套接字并执行 I/O 操作。 套接字的打开和使用如下: /* Create the socket */ if ((
每天——大约 5 到 10 次——我的 USB 摄像头从系统中消失。它从第一天开始就发生了,因为制造商驱动程序与 Linux 不兼容。 lsusb 和 dmesg 一开始正确显示,但在较长时间后有时会
我分发了包含多个 Go 服务的应用程序。其中一些使用 Kafka 作为数据总线。我能够使用 Jaeger 的 opentracing 追踪服务之间的调用。我在图表上绘制 Kafka 跨度时遇到问题,它
我是一名优秀的程序员,十分优秀!