- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在多个线程中的UDP上接收消息。每次接待后,我都会提出MessageReceived.OnNext(message)
。
因为我使用多个线程,所以消息无序引发,这是一个问题。
如何通过消息计数器命令消息的引发?
(让我们说有一个message.counter属性)
必须记住,一条消息可能会在通信中丢失(如果在X消息未填满之后我们有一个埋藏的漏洞,我会提出下一条消息)
必须尽快引发消息(如果收到下一个计数器)
最佳答案
在说明检测丢失消息的要求时,您没有考虑到最后一条消息未到达的可能性。我添加了一个timeoutDuration
,如果在给定时间内没有任何响应,则刷新缓冲的消息-您可能想将其视为错误,请参阅注释以了解如何执行此操作。
我将通过定义具有以下签名的扩展方法来解决此问题:
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
TimeSpan timeoutDuration = new TimeSpan(),
int gapTolerance = 0)
source
是未排序消息流keySelector
是从消息中提取int
密钥的功能。我假设寻找的第一个键是0;必要时进行修改。 timeoutDuration
,如果省略,则没有超时tolerance
是等待出现故障的消息时保留的最大消息数。传递0
以保存任意数量的消息scheduler
是用于超时的调度程序,用于测试目的,如果未指定,则使用默认值。 scheduler = scheduler ?? Scheduler.Default;
OnCompleted
,则发送
timeoutDuration
。
if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
TimeoutException
,只需删除
Timeout
的第二个参数-空流,以选择执行此操作的重载。请注意,我们可以安全地与所有订阅者共享此文件,因此它位于
Observable.Create
的调用之外。
Observable.Create
构建流。每当发生订阅时,即会调用作为
Create
的参数的lambda函数,并将调用观察者(
o
)传递给我们。
Create
返回我们的
IObservable<T>
,因此我们在这里返回它。
return Observable.Create<TSource>(o => { ...
nextKey
中跟踪下一个预期的键值,并创建一个
SortedDictionary
来保存乱序消息,直到可以发送它们为止。
int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
OnNext
处理程序。下一条消息分配给
x
:
return source.Subscribe(x => { ...
keySelector
函数从消息中提取密钥:
var key = keySelector(x);
// drop stale keys
if(key < nextKey) return;
nextKey
发送消息:
if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
nextKey
碰到缓冲区中的第一个键,因为它是
SortedDictionary
,所以方便地使用下一个最低键:
else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
nextKey
,因此我们必须小心通过引用传递它。我们只需遍历缓冲区读取,删除和发送消息,只要键彼此跟随即可,每次都增加
nextKey
:
private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
OnError
处理程序-这将通过任何错误,包括Timeout异常(如果您选择采用这种方式)。
OnCompleted
。在这里,我选择清空缓冲区-如果乱序消息阻止了消息并且从未到达,则这是必要的。这就是为什么我们需要超时的原因:
() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
int gapTolerance = 0,
TimeSpan timeoutDuration = new TimeSpan(),
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
return Observable.Create<TSource>(o => {
int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
return source.Subscribe(x => {
var key = keySelector(x);
// drop stale keys
if(key < nextKey) return;
if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
SendNextConsecutiveKeys(ref nextKey, o, buffer);
},
o.OnError,
() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
});
}
private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
rx-testing
,则将在运行以下测试工具时运行以下命令:
public static void Main()
{
var tests = new Tests();
tests.Test();
}
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable(
OnNext(100, 0),
OnNext(200, 2),
OnNext(300, 1),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 3),
OnNext(700, 7),
OnNext(800, 8),
OnNext(900, 9),
OnNext(1000, 6),
OnNext(1100, 12),
OnCompleted(1200, 0));
//var results = scheduler.CreateObserver<int>();
xs.Sort(
keySelector: x => x,
gapTolerance: 2,
timeoutDuration: TimeSpan.FromTicks(200),
scheduler: scheduler).Subscribe(Console.WriteLine);
scheduler.Start();
}
}
关于udp - 订购 react 性扩展事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26450521/
我在 Mac OsX 10.11 上使用 Xcode 7.0.1 (7A1001) 我使用 carthage 0.9.2 通过以下购物车文件下载reactivecocoa github“Reactiv
我正在将一个对象从属性“模型”(我从 Laravel 中的 Blade 属性模型中获得)分配给数据属性模型。后来数据属性模型发生变化,因为它绑定(bind)到表单输入字段。但 Prop “模型”也发生
当我更新数组内对象的属性然后作为组件的 Prop 传递时,在 svelte 中触发 react 性的正确方法是什么? let items = [{ id: 1, name: 'first'
我是 DRY principle 的坚定拥护者: Every piece of knowledge must have a single, unambiguous, authoritative rep
我正在实现一个需要以下功能的线程: 及时响应终止请求 推送消息 在等待消息时保持对 SendMessage 请求的响应 我对消息泵的初始实现使用了 GetMessage,如下所示: while not
在我的应用程序中,用户获得了一份已到达她的文档列表,并且可以对每个文档执行操作。 文件是分批提交的,当这种情况发生时,列表会增加。这一切都很好,这是预期的行为,但最好有一个按钮“暂停实时数据”,它会忽
我有一个属性为 的数据对象 displaySubtotal 我可以通过以下方式更新该属性的值: data.displaySubtotal = numPad.valueAsAString(); 我的方法
我需要一个垂直 slider 输入。由于内置的 sliderInput 函数无法做到这一点,因此我选择自己实现。根据this thread可以 (I) 使用 CSS 旋转 sliderInput
我正在从自定义用户权限管理系统迁移到 Alanning:roles v2.0 .我有一个非常基本的结构: 基本用户 用户组,每个用户组都有特定的设置。我将它们存储在一个“组”集合中。 管理群组的用户的
Shiny 中的响应式(Reactive)表达式将更改传播到需要去的地方。我们可以使用 isolate 来抑制一些这种行为。 ,但是我们可以抑制基于我们自己的逻辑表达式传播的更改吗? 我给出的例子是一
是否有(或可能有) react 性 Parsec (或任何其他纯函数式解析器)在 Haskell 中? 简而言之,我想逐个字符地为解析器提供数据,并获得与我提供的足够多的结果一样多的结果。 或者更简单
HTML(JADE) p#result Lorem ipsum is javascript j s lo 1 2 4 this meteor thismeteor. meteor input.sear
我有一个被导入函数更改的对象。 https://svelte.dev/repl/e934087af1dc4a25a1ee52cf3fd3bbea?version=3.12.1 我想知道如何使我的更改反
我有一个YUV 420半平面格式的图像,其中字节以这种方式存储: [Y1 Y2 ... [U1 V1.... Yk Yk+1...] Uk' Uk'+1] 其中Y平面的大小是UV平面的两倍,并
如何使用 ReactiveCocoa 订阅从 NSMutableDictionary 添加和删除的对象?另外,我想在它发生变化时广播通知。我的猜测是可以使用 RACMulticastConnectio
我正在构建一个带有多个选项卡的应用程序,其中一些选项卡涉及过多的计算,而另一些选项卡的计算速度很快。一个允许用户在 react 性或手动更新之间进行选择的复选框,与“刷新”按钮结合使用,将是理想的选择
我知道您可以在获取集合时使用 reactive: false 关闭 react 性。如何在内容可编辑区域内的集合字段中实现相同的效果?示例: Template.documentPage.events(
我想在 z3 中表示一个哈希函数,比如 SHA(x)。在做了一些研究之后,似乎 z3 不能很好地支持注入(inject)性,所以我不能有像这样的约束(虽然我意识到这并不是严格意义上的碰撞,但作为一种启
我正在解决一个问题,我想在仪表板中将数据显示为图表(通过 perak:c3 )和表格(通过 aslagle:reactive-table )。我的问题是数据是从 MongoDB 中的集合中提取的,它的
我的 ViewModel 中有这个函数,它返回一个信号,但内部 block 不起作用,我尝试添加断点,但它没有中断。这是我的代码。 func executeLoginAPI() -> RACSigna
我是一名优秀的程序员,十分优秀!