gpt4 book ai didi

c# - 我如何使用 Reactive Extensions 将 IObservable.Throttle() 与其他一些事件源结​​合起来?

转载 作者:行者123 更新时间:2023-12-04 04:37:05 28 4
gpt4 key购买 nike

我有一个简单的搜索字段,它应该在用户停止输入或按下搜索 后自动搜索。第一部分可以通过以下方式轻松实现

var inputs = Observable.FromEventPattern<SomeEventArgs>(searchBar, "TextChanged")
.Select(pattern => pattern.EventArgs.SearchText)
.Throttle(TimeSpan.FromMilliseconds(500));

我也可以像这样链接它来进行实际搜索

var results = from query in inputs
from results in Observable.FromAsync<Something>(() => Search(query))
select results;

但问题是,当用户按下搜索按钮时,无法向前跳转。根据我对 Rx 的理解,代码可能应该是这样的:

// inputs = the same event stream without Throttle
// buttonClicks = construct event stream for search button clicks

// ??? somehow make a third stream which lets a value through
// ??? only after a delay and when the value hasn't changed,
// ??? OR when the second stream yields a value

// async search

我可以看到如何通过使用类似 Stopwatch 的东西并在用户输入时重置它来命令式地编写它,如果点击通过我可以跳过它。但在 Rx 的世界里,它可能看起来像(请原谅伪 linq 代码)

from query in inputs
where (query.isLast() and query.timestamp > 500.ms.ago) or buttonClicked
...

如果第二个事件源产生一个值,或者如果没有值,那么我需要能够立即完成最后一个查询输入,就像使用 Throttle 一样等待指定的延迟

最佳答案

首先,典型的搜索 Rx 如下所示:

var searchResults = Observable.FromEventPattern<SomeEventArgs>(searchBar, "TextChanged")
.Select(pattern => pattern.EventArgs.SearchText)
.Throttle(TimeSpan.FromMilliseconds(500))
.DistinctUntilChanged()
.Select(text => Observable.Start(() => Search(text)))
.Switch()

Select 为您提供结果流,Switch 将返回最近创建的流。我添加了 DistinctUntilChanged 以防止提交重复的查询。

您所描述的一种策略是为 throttle 提供一个 throttle 持续时间选择器,该选择器将在 throttle 持续时间之后或单击按钮时发出。我拼凑了一个示例 ViewModel,它避免使用 Rx 2.1 以外的任何库来展示如何做到这一点。这就是整个 ViewModel - 我会将 View 和存储库留给您自己想象,但应该清楚它们的作用。

最后的警告 - 我试图让这个示例保持简短,并省略了不必要的细节,这些细节可能会影响理解,因此这还没有准备好生产:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using System.Windows.Input;
using StackOverflow.Rx.Annotations;
using StackOverflow.Rx.Model;

namespace StackOverflow.Rx.ProductSearch
{
public class ClassicProductSearchViewModel : INotifyPropertyChanged
{
private string _query;
private IProductRepository _productRepository;
private IList<Product> _productSearchResults;

public ClassicProductSearchViewModel(IProductRepository productRepository)
{
_productRepository = productRepository;
// Wire up a Button from the view to this command with a binding like
// <Button Content="Search" Command="{Binding ImmediateSearch}"/>
ImmediateSearch = new ReactiveCommand();

// Wire up the Query text from the view with
// a binding like <TextBox MinWidth="100" Text="{Binding Query, UpdateSourceTrigger=PropertyChanged}"/>
var newQueryText = Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
h => PropertyChanged += h,
h => PropertyChanged -= h)
.Where(@event => @event.EventArgs.PropertyName == "Query")
.Select(_ => Query);

// This duration selector will emit EITHER after the delay OR when the command executes
var throttleDurationSelector = Observable.Return(Unit.Default)
.Delay(TimeSpan.FromSeconds(2))
.Merge(ImmediateSearch.Select(x => Unit.Default));


newQueryText
.Throttle(x => throttleDurationSelector)
.DistinctUntilChanged()
/* Your search query here */
.Select(
text =>
Observable.StartAsync(
() => _productRepository.FindProducts(new ProductNameStartsWithSpecification(text))))
.Switch()
.ObserveOnDispatcher()
.Subscribe(products => ProductSearchResults = new List<Product>(products));
}

public IList<Product> ProductSearchResults
{
get { return _productSearchResults; }
set
{
if (Equals(value, _productSearchResults)) return;
_productSearchResults = value;
OnPropertyChanged();
}
}

public ReactiveCommand ImmediateSearch { get; set; }

public string Query
{
get { return _query; }
set
{
if (value == _query) return;
_query = value;
OnPropertyChanged();
}
}

public event PropertyChangedEventHandler PropertyChanged;

[NotifyPropertyChangedInvocator]
protected virtual void OnPropertyChanged([CallerMemberName] string propertyName = null)
{
PropertyChangedEventHandler handler = PropertyChanged;
if (handler != null) handler(this, new PropertyChangedEventArgs(propertyName));
}
}

// A command that is also an IObservable!
public class ReactiveCommand : ICommand, IObservable<object>
{
private bool _canExecute = true;
private readonly Subject<object> _execute = new Subject<object>();

public ReactiveCommand(IObservable<bool> canExecute = null)
{
if (canExecute != null)
{
canExecute.Subscribe(x => _canExecute = x);
}
}

public bool CanExecute(object parameter)
{
return _canExecute;
}

public void Execute(object parameter)
{
_execute.OnNext(parameter);
}

public event EventHandler CanExecuteChanged;

public IDisposable Subscribe(IObserver<object> observer)
{
return _execute.Subscribe(observer);
}
}
}

有像 Rxx 和 ReactiveUI 这样的库可以使这段代码更简单——我在这里没有使用它们,所以发生的“魔法”很少!

我在这个例子中的 ReactiveCommand 是 ReactiveUI 中包含的一个简单实现。这看起来同时是一个命令和一个 IObservable。无论何时执行,它都会流式传输命令参数。

这里是一个使用来自作者博客的 ReactiveUI 的例子:http://blog.paulbetts.org/index.php/2010/06/22/reactivexaml-series-reactivecommand/

In another answer I look at the variable throttling feature in isolation.

关于c# - 我如何使用 Reactive Extensions 将 IObservable<T>.Throttle() 与其他一些事件源结​​合起来?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19566517/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com