gpt4 book ai didi

c# - 使用响应式(Reactive)扩展来配对请求和响应

转载 作者:太空宇宙 更新时间:2023-11-03 10:22:28 29 4
gpt4 key购买 nike

场景:

我向服务器队列发送请求(我使用 RabbintMQ 异步处理消息)。当服务器处理请求时,它会向不同的队列产生两个响应。

我想使用 RX 订阅响应,但也可以访问它们相应的请求。

我目前拥有的:

我使用 EventAggregator,它使用响应式(Reactive)扩展并为事件流公开 IObservable:

public interface IEventAggregator
{
void Publish<TEvent>(TEvent sampleEvent);
IObservable<TEvent> GetEvent<TEvent>();
}

当我发送请求时,我会发布一个事件:

eventAggregator.Publish(new RequestSent { ID = Guid.NewGuid(), ... })
//I could later subscribe to the event like this:
//eventAggregator.GetEvent<RequestSent>().Subscribe(..)

当服务器响应时,响应也会发布到 EventAggregator,因此我可以订阅它们:

eventAggregator.GetEvent<ResponseFromQueue1>().Subscribe(OnResponseFromQueue1)

eventAggregator.GetEvent<ResponseFromQueue2>().Subscribe(OnResponseFromQueue2)

我也可以订阅 RequestSent

我需要的:

private void OnResponseFromQueue1(RequestSent request, ResponseFromQueue1 response)
{
I need access to both request and respone
}

这会更好:

private void OnResponse(
RequestSent request,
ResponseFromQueue1 response1,
ResponseFromQueue2 response2)
{
//actually, this would simplify implementation of my logic a lot
}

可以使用 RX 吗?

最佳答案

您可以使用 SelectMany,假设您可以使用 ID 之类的东西将请求与响应相关联

trigger.SelectMany(requestData => {
//We need to share this
var id = Guid.NewGuid();

//Publish your event
eventAggregator.Publish(new Request { ID = id, /*...*/ });

//Start listening for the two responses
//Grab only the first item matching the IDs
var left = eventAggregator.GetEvent<ResponseFromQueue1>().First(res => res.ID == id);
var right = eventAggregator.GetEvent<ResponseFromQueue2>().First(res => res.Id == id);

//We are done once both values have emitted.
return left.Zip(right);
}, (request, responses) => {
/*Here you will have access to the request and an array of the responses*/
});

要记住的一件事是,这段代码现在假设 Publish 将在响应返回之前返回。既然你说这是 RabbitMQ,这可能是一个安全的假设,但如果你对此进行任何单元测试,请记住这一点。

编辑

在您的场景中,您似乎实际拥有:

//Set up the queue first    
eventAggregator.GetEvent<RequestSent>()
.SelectMany(requestSent => {
var id = requestSent.ID;
var left = eventAggregator.GetEvent<ResponseFromQueue1>().First(res => res.ID == id);
var right = eventAggregator.GetEvent<ResponseFromQueue2>().First(res => res.ID == id);

return left.Zip(right);
}, (request, response) => {/**/});

//...Sometime later
eventAggregator.Publish(new Request{});

关于c# - 使用响应式(Reactive)扩展来配对请求和响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32892474/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com