- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我对响应式编程的概念还很陌生。我正在使用 Bonsai ,它通过 C# 公开了一些但不是全部的 .Net rx 命令。
我正在尝试获得像这个弹珠图这样的行为:
input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e
基本上,输入 2 生成应该存储在队列中的事件波。输入 1 用作从该队列发出单个项目的触发器。
当队列为空时,应发出队列的最后一项。我尝试了 zip 和 combineLatest 的各种组合,但无法获得所需的行为。
我还尝试了基于 this post 的 WithLatestFrom
实现,但回想起来我意识到这也不会产生所需的行为。
public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
IObservable<TSource> source,
IObservable<TOther> other)
{
// return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
}
是否有任何运算符或运算符组合会产生此行为?一旦我了解了要使用的运算符,我就可以对 Bonsai 进行实现。
更新 1:2018/05/18
根据 Sentinel 的帖子,我在 Bonsai 命名空间内编写了一个新类 DiscriminatedUnion
。虽然我没有设法指定适当的类型。编译器声明“无法推断 Merge
的类型参数”(在 .Merge(input1.Select...
中)。在哪里添加正确的类型规范?
using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;
namespace Bonsai.Reactive
{
[Combinator]
// [XmlType(Namespace = Constants.XmlNamespace)]
[Description("Implementation of Discriminated Union")]
public class DiscriminatedUnion
{
public IObservable<int?> Process<TInput1, TInput2>(
IObservable<TInput1> input1,
IObservable<TInput2> input2)
{
var merged =
input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
.Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
.Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
{
int? next = state.Item1;
if (val.Item1 == 1)
{
if (state.Item2.Count > 0)
{
next = state.Item2.Dequeue();
}
}
else
{
state.Item2.Enqueue(val.Item2);
}
return Tuple.Create(next, state.Item2, val.Item1);
})
.Where(x => (x.Item1 != null && x.Item3 == 1))
.Select(x => x.Item1);
return merged;
}
}
}
最佳答案
这是使用 NuGet 包 Microsoft.Reactive.Testing
对您的问题(或弹珠图)进行测试的表示:
var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(1000.Ms(), 1),
ReactiveTest.OnNext(2000.Ms(), 2),
ReactiveTest.OnNext(3000.Ms(), 3),
ReactiveTest.OnNext(4000.Ms(), 4),
ReactiveTest.OnNext(5000.Ms(), 5),
ReactiveTest.OnNext(6000.Ms(), 6),
ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1400.Ms(), "a"),
ReactiveTest.OnNext(1500.Ms(), "b"),
ReactiveTest.OnNext(1600.Ms(), "c"),
ReactiveTest.OnNext(5500.Ms(), "d"),
ReactiveTest.OnNext(5600.Ms(), "e"),
ReactiveTest.OnNext(5700.Ms(), "f")
);
使用此扩展方法:
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
该问题基本上是一个状态机问题,涉及两个不同类型的可观察对象。解决此问题的最佳方法是使用 Discriminated Union类型,它在 C# 中不存在,因此我们将创建一个。 @Sentinel 的回答是用一个元组来完成的,它也可以工作:
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
然后我们可以将我们的两个不同类型的流,Select
和 Merge
合并到一个可区分的联合流中,我们可以在其中使用 Scan< 管理状态
。您的状态逻辑有点棘手,但可行:
这是生成的可观察对象(使用 NuGet 包 System.Collections.Immutable
):
var result = input1.Select(i => new DUnion<int, string>(i))
.Merge(input2.Select(s => new DUnion<int, string>(s)))
.Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
? state.queue.IsEmpty
? (state.queue, null, false, false) //Is integer, but empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true)
: (state.queue.Dequeue(), state.queue.Peek(), false, true)
: state.isFakeEmptyState //Is new string, just add to queue, don't emit
? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false)
: (state.queue.Enqueue(dItem.Type2Item), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);
然后可以按如下方式进行测试:
var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.
更新:我稍微考虑了一下,我认为将一些运算符放在 Discriminated Union 功能周围是有意义的。这样您就不必明确处理类型:
public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x)));
}
public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}
有了这些扩展方法,解决方案就变成了这样,我认为这样读起来更好:
var result = input1
.Union(input2)
.ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false),
(state, _) => state.queue.IsEmpty
? (state.queue, null, false, false) //empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: (state.queue.Dequeue(), state.queue.Peek(), false, true),
(state, s) => state.isFakeEmptyState
? (state.queue.Dequeue().Enqueue(s), null, false, false)
: (state.queue.Enqueue(s), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);
如果您在命名元组语法方面遇到问题,那么您可以使用旧的元组:
var result = input1
.Union(input2)
.ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
(state, _) => state.Item1.IsEmpty
? Tuple.Create(state.Item1, (string)null, false, false) //empty queue, so don't emit item
: state.Item1.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
(state, s) => state.Item3
? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
: Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
)
.Where(t => t.Item4)
.Select(t => t.Item2);
关于c# - .Net 中的 Reactive Rx zip 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50395951/
我遇到一种情况,我需要从某个主题读取(正在进行的)消息并将它们放入另一个 Queue 中。我怀疑我是否需要 jms Queue 或者我可以对内存中的 java Queue 感到满意。我将通过同一 jv
队列的定义 队列(Queue):先进先出的线性表 队列是仅在队尾进行插入和队头进行删除操作的线性表 队头(front):线性表的表头端,即可删除端 队尾(rear):线性表的表尾端,即可插入端 由于这
Redis专题-队列 首先,想一想 Redis 适合做消息队列吗? 1、消息队列的消息存取需求是什么?redis中的解决方案是什么? 无非就是下面这几点: 0、数据可以顺序读
0. 学习目标 栈和队列是在程序设计中常见的数据类型,从数据结构的角度来讲,栈和队列也是线性表,是操作受限的线性表,它们的基本操作是线性表操作的子集,但从数据类型的角度来讲,它们与线性表又有着巨大的不
我想在 redis + Flask 和 Python 中实现一个队列。我已经用 RQ 实现了这样的查询,如果你有 Flask 应用程序和任务在同一台服务器上工作,它就可以正常工作。我想知道是否有可能创
我正在使用 Laravel 5.1,我有一个大约需要 2 分钟来处理的任务,这个任务特别是生成报告...... 现在,很明显,我不能让用户在我接受用户输入的同一页面上等待 2 分钟,而是我应该在后台处
我正在使用 Azure 队列,并且有多个不同的进程从队列中读取数据。 我的系统的构建方式假设每条消息只读取一次。 这个Microsoft article声称 Azure 队列具有至少一次传送保证,这可
我正在创建一个Thread::Queue元素数组。 我这样做是这样的: for (my $i=0; $i new; } 但是,当我在每个队列中填充这样的元素时 $queues[$index]->enq
我试图了解如何将我的 Mercurial 补丁推送到远程存储库(例如 bitbucket.org),而不必先应用它们(实际上提交它们)。我的动机是在最终完成之前首先对我的工作进行远程备份,并且能够与其
我的本地计算机上有一个 Mercurial 队列补丁,我需要与同事共享该补丁,但我不想将其提交到上游存储库。有没有一种简单的方法可以打包该补丁并与他分享? 最佳答案 mq 将补丁作为不带扩展名的文
Java 中是否有任何类提供与 Queue 相同的功能,但有返回对象的选项,并且不要删除它,只需将其设置在集合末尾? 最佳答案 Queue不直接提供这样的方法。但是,您可以使用 poll 和 add
我在Windows上使用Tortoise svn客户端,我需要能够一次提交来自不同子文件夹的更改文件-一次提交。像在提交之前将文件添加到队列中之类的?我该怎么做? Windows上是否还有另一个svn
好吧,我正在尝试对我的 DSAQueue 类进行单元测试,它显示我的 isEmpty()、isFull() 和 dequeue() 方法失败。 以下是我的 DSAQueue 代码。我认为我的 Dequ
我想尽量减少对传入请求的数据库查询。它目前需要写入 6 个不同的表。在返回响应之前不需要完成处理。因此,我考虑了 laravel 队列,但我想知道我是否也可以摆脱写入队列/作业表所需的单独查询。我可以
我正在学习队列数据结构。我想用链表创建队列。我想编程输出:10 20程序输出:队列为空-1 队列为空-1 我哪里出错了? 代码如下: class Node { int x; Node next
“当工作人员有空时,他们会根据主题的优先级列表从等待请求池中进行选择。在时间 t 到达的所有请求都可以在时间 t 进行分配。如果两名工作人员同时有空,则安排优先权分配给最近的工作最早安排的人。如果仍然
我正在开发一个巨大的应用程序,它使用一些子菜单、模式窗口、提示等。 现在,我想知道在此类应用程序中处理 Esc 和单击外部事件的正确方法。 $(document).keyup(function(e)
所以 如果我有一个队列 a --> b --> NULL; 当我使用函数时 void duplicate(QueueNodePtr pHead, QueueNodePtr *pTail) 它会给 a
我正在尝试为键盘输入实现 FIFO 队列,但似乎无法让它工作。我可以让键盘输入显示在液晶显示屏上,但这就是我能做的。我认为代码应该读取键盘输入并将其插入队列,然后弹出键盘输入并将值读取到液晶屏幕上。有
我正在学习算法和 DS。如何在 JavaScript 中使用队列? 我知道你可以做这样的事情。 var stack = []; stack.push(2); // stack is now
我是一名优秀的程序员,十分优秀!