gpt4 book ai didi

c# - react 性扩展 - 引发异步事件并订阅特定线程

转载 作者:太空宇宙 更新时间:2023-11-03 14:12:41 25 4
gpt4 key购买 nike

我有一系列使用 RX 发布/订阅模型的模块。

这是事件注册代码(每个订阅模块重复):

_publisher.GetEvent<DataEvent>()                 
.Where(sde => sde.SourceName == source.Name)
.ObserveOn(Scheduler.TaskPool)
.Subscribe(module.OnDataEvent);

发布者很简单,感谢José Romaniello's code :

public class EventPublisher : IEventPublisher
{
private readonly ConcurrentDictionary<Type, object> _subjects =
new ConcurrentDictionary<Type, object>(); public IObservable<TEvent> GetEvent<TEvent>()
{
var subject = (ISubject<TEvent>)_subjects.GetOrAdd(typeof(TEvent), t => new Subject<TEvent>());
return subject.AsObservable();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
object subject;
if (_subjects.TryGetValue(typeof(TEvent), out subject))
{
((ISubject<TEvent>)subject).OnNext(sampleEvent);
}
}
}

现在我的问题是:正如您在上面看到的,我使用 .ObserveOn(Scheduler.TaskPool) 方法为每个模块、每个事件从池中分离出一个新线程。这是因为我有很多事件和模块。当然,问题是事件在时间顺序上混淆了,因为一些事件彼此靠近触发,然后最终以错误的顺序调用 OnDataEvent 回调(每个 OnDataEvent 都带有时间戳)。

有没有简单的方法使用RX来保证事件的正确顺序?或者我可以编写自己的调度程序来确保每个模块按顺序获取事件吗?

当然,事件以正确的顺序发布。

提前致谢。

最佳答案

尝试使用 EventPublisher 的这个实现:

public class EventPublisher : IEventPublisher
{
private readonly EventLoopScheduler _scheduler = new EventLoopScheduler();
private readonly Subject<object> _subject = new Subject<object>();

public IObservable<TEvent> GetEvent<TEvent>()
{
return _subject
.Where(o => o is TEvent)
.Select(o => (TEvent)o)
.ObserveOn(_scheduler);
}

public void Publish<TEvent>(TEvent sampleEvent)
{
_subject.OnNext(sampleEvent);
}
}

它使用 EventLoopScheduler 来确保所有事件按顺序发生在同一个后台线程上。

从您的订阅中删除 ObserveOn,因为如果您在另一个线程上观察,您可能会再次以错误的顺序发生事件。

这是否解决了您的问题?

关于c# - react 性扩展 - 引发异步事件并订阅特定线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7345265/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com