gpt4 book ai didi

c# - ConcurrentBag.ToObservable() 运行一次并提前完成

转载 作者:太空宇宙 更新时间:2023-11-03 21:27:49 24 4
gpt4 key购买 nike

我有一个静态集合,比如调用远程 rest api 的任务:

static ConcurrentBag<Task<HttpResponseMessage>> _collection = new ConcurrentBag<Task<HttpResponseMessage>>();

static void Main(string[] args)
{
Task.Factory.StartNew(() => Produce());
Task.Factory.StartNew(() => Consume());

Console.ReadKey();
}

一个线程向其中添加新项目:

private static void Produce()
{
while (true)
{
var task = HttpClientFactory.Create().GetAsync("http://example.com");
_collection.Add(task);
Thread.Sleep(500);
}
}

另一个线程应该处理这些项目:

private static void Consume()
{
_collection.ToObservable()
.Subscribe(
t => Console.WriteLine("++"),
ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Done"));
}

但它只运行一次并过早完成。所以输出是;

++

Done

最佳答案

如果它能像那样工作那将很有趣……但遗憾的是它没有。 ToObservable扩展方法在 IEnumerable<T> 上定义界面 - 所以它正在获取集合的时间点快照。

你需要一个可以观察到的集合,比如ObservableCollection .有了这个,您可以响应添加事件以提供 Rx 管道(可能通过将 CollectionChanged 事件与 Observable.FromEventPattern 连接起来)。请记住,此集合不支持并发添加。这种技术是“进入 monad”(即获得 IObservable<T> )的一种方式。

等效于将您的请求有效负载添加到主题。无论哪种方式,您都可以将它们转换到异步请求中。所以说(为了争论),你的Produce签名看起来像这样:

private static async Task<HttpResponseMessage> Produce(string requestUrl)

然后你可以构建一个可观察对象,使用你的 Produce 将 requestUrls 转换为异步网络请求。像这样的方法:

var requests = new Subject<string>();
var responses = requests.SelectMany(
x => Observable.FromAsync(() => Produce(x)));

responses.Subscribe(
t => Console.WriteLine("++"),
ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Done"));

然后提交每个请求,例如:

requests.OnNext("http://myurl");

如果您需要并发添加,请参阅 Observable.Synchronize .

如果您需要控制处理响应的线程,请使用 ObserveOn我写了一篇关于 here 的冗长解释.

关于c# - ConcurrentBag.ToObservable() 运行一次并提前完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25902630/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com