gpt4 book ai didi

c# - 使用 Reactive Extensions 制作 Action 的通用调度程序

转载 作者:行者123 更新时间:2023-11-30 16:18:58 25 4
gpt4 key购买 nike

我正在对 Reactive Extensions 进行大量试验,现在我正在尝试制作一个系统,在该系统中,我可以对过程进行排队并以我想要的任何方式执行它们,同时能够向订阅者发送通知。

我目前将我的数据库访问权限封装在一个 UserAccess 类中,该类公开了添加用户的方法。在该方法中,我想对将用户添加到数据库的操作进行排队。所以我创建了一个 T 类的 JobProcessor,它公开了一个方法 QueueJob(Action) 并让我的用户实现了这个类。我的问题是我看不到如何从 Observable 的 OnNext 方法中调用 Action,因为该操作采用 User 参数。

一定是我的攻角不对,一定是我对设计的把握有问题。例如,我知道我应该以某种方式将我的用户传递给 QueueJob 过程,但我不知道如何以干净的方式进行。

    public class UserAccess : JobProcessor<User>
{
public void AddUser(User user)
{
QueueJob(usr =>
{
using (var db = new CenterPlaceModelContainer())
{
db.Users.Add(usr);
}

});
[...]

public abstract class JobProcessor<T>
{
// Either Subject<T> or Subject<Action<T>>
private Subject<Action<T>> JobSubject = new Subject<Action<T>>();

public JobProcessor()
{
JobSubject
/* Insert Rx Operators Here */
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}

private void OnJobNext(Action<T> action)
{
// ???
}

private void OnJobError(Exception exception)
{

}

private void OnJobComplete()
{

}

public void QueueJob(Action<T> action)
{
JobSubject.OnNext(action);
}
}

编辑 1 :

我尝试将QueueJob的签名改为

QueueJob(T entity, Action<T> action)

现在我可以做

QueueJob(user, usr => { ... } );

但是好像不是很直观。我还没有看到很多框架可以同时传递实体和 Action 。这样一来,我还不如不需要 JobProcessor。

编辑 2 :我将 JobProcessor 的主题类型更改为主题,完全删除了 T。因为不需要在过程中包含用户,因为我可以在外部引用它。现在唯一的问题是,如果我传递给 QueueJob 的操作的用户在操作执行的实际时间之间发生变化,则用户将拥有修改后的信息。不受欢迎,但我想我会继续寻找解决方案。

我的代码现在是(使用 Buffer 作为示例):

public abstract class JobProcessor
{
public Subject<Action> JobSubject = new Subject<Action>();

public JobProcessor()
{
JobSubject
.Buffer(3)
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}

private void OnJobNext(IList<Action> actionsList)
{
foreach (var element in actionsList)
{
element();
}
}

private void OnJobError(Exception exception)
{

}

private void OnJobComplete()
{

}

public void QueueJob(Action action)
{
JobSubject.OnNext(action);
}
}

最佳答案

首先,我必须同意 Lee 和 NSGaga 的观点,您可能不想这样做 - 生产者/消费者队列还有其他模式更符合(我认为)您要在这里完成什么。

就是说,既然我永远无法抗拒挑战……通过一些小的调整,您就可以消除“我将什么传递到 Action 中?”的直接问题。只需捕获传入的用户参数并将其直接设置为 Action - 这是经过一些修改的代码:

public class UserAccess : JobProcessor
{
public void AddUser(User user)
{
QueueJob(() =>
{
using (var db = new CenterPlaceModelContainer())
{
db.Users.Add(user);
}

});
[...]

public abstract class JobProcessor
{
// Subject<Action>
private Subject<Action> JobSubject = new Subject<Action>();

public JobProcessor()
{
JobSubject
/* Insert Rx Operators Here */
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}

private void OnJobNext(Action action)
{
// Log something saying "Yo, I'm executing an action" here?
action();
}

private void OnJobError(Exception exception)
{
// Log something saying "Yo, something broke" here?
}

private void OnJobComplete()
{
// Log something saying "Yo, we shut down" here?
}

public void QueueJob(Action action)
{
JobSubject.OnNext(action);
}
}

关于c# - 使用 Reactive Extensions 制作 Action<T> 的通用调度程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15627481/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com