- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在多个线程中的UDP上接收消息。每次接待后,我都会提出MessageReceived.OnNext(message)
。
因为我使用多个线程,所以消息无序引发,这是一个问题。
如何通过消息计数器命令消息的引发?
(让我们说有一个message.counter属性)
必须记住,一条消息可能会在通信中丢失(如果在X消息未填满之后我们有一个埋藏的漏洞,我会提出下一条消息)
必须尽快引发消息(如果收到下一个计数器)
最佳答案
在说明检测丢失消息的要求时,您没有考虑到最后一条消息未到达的可能性。我添加了一个timeoutDuration
,如果在给定时间内没有任何响应,则刷新缓冲的消息-您可能想将其视为错误,请参阅注释以了解如何执行此操作。
我将通过定义具有以下签名的扩展方法来解决此问题:
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
TimeSpan timeoutDuration = new TimeSpan(),
int gapTolerance = 0)
source
是未排序消息流keySelector
是从消息中提取int
密钥的功能。我假设寻找的第一个键是0;必要时进行修改。 timeoutDuration
,如果省略,则没有超时tolerance
是等待出现故障的消息时保留的最大消息数。传递0
以保存任意数量的消息scheduler
是用于超时的调度程序,用于测试目的,如果未指定,则使用默认值。 scheduler = scheduler ?? Scheduler.Default;
OnCompleted
,则发送
timeoutDuration
。
if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
TimeoutException
,只需删除
Timeout
的第二个参数-空流,以选择执行此操作的重载。请注意,我们可以安全地与所有订阅者共享此文件,因此它位于
Observable.Create
的调用之外。
Observable.Create
构建流。每当发生订阅时,即会调用作为
Create
的参数的lambda函数,并将调用观察者(
o
)传递给我们。
Create
返回我们的
IObservable<T>
,因此我们在这里返回它。
return Observable.Create<TSource>(o => { ...
nextKey
中跟踪下一个预期的键值,并创建一个
SortedDictionary
来保存乱序消息,直到可以发送它们为止。
int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
OnNext
处理程序。下一条消息分配给
x
:
return source.Subscribe(x => { ...
keySelector
函数从消息中提取密钥:
var key = keySelector(x);
// drop stale keys
if(key < nextKey) return;
nextKey
发送消息:
if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
nextKey
碰到缓冲区中的第一个键,因为它是
SortedDictionary
,所以方便地使用下一个最低键:
else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
nextKey
,因此我们必须小心通过引用传递它。我们只需遍历缓冲区读取,删除和发送消息,只要键彼此跟随即可,每次都增加
nextKey
:
private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
OnError
处理程序-这将通过任何错误,包括Timeout异常(如果您选择采用这种方式)。
OnCompleted
。在这里,我选择清空缓冲区-如果乱序消息阻止了消息并且从未到达,则这是必要的。这就是为什么我们需要超时的原因:
() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
int gapTolerance = 0,
TimeSpan timeoutDuration = new TimeSpan(),
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
return Observable.Create<TSource>(o => {
int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
return source.Subscribe(x => {
var key = keySelector(x);
// drop stale keys
if(key < nextKey) return;
if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
SendNextConsecutiveKeys(ref nextKey, o, buffer);
},
o.OnError,
() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
});
}
private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
rx-testing
,则将在运行以下测试工具时运行以下命令:
public static void Main()
{
var tests = new Tests();
tests.Test();
}
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable(
OnNext(100, 0),
OnNext(200, 2),
OnNext(300, 1),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 3),
OnNext(700, 7),
OnNext(800, 8),
OnNext(900, 9),
OnNext(1000, 6),
OnNext(1100, 12),
OnCompleted(1200, 0));
//var results = scheduler.CreateObserver<int>();
xs.Sort(
keySelector: x => x,
gapTolerance: 2,
timeoutDuration: TimeSpan.FromTicks(200),
scheduler: scheduler).Subscribe(Console.WriteLine);
scheduler.Start();
}
}
关于udp - 订购 react 性扩展事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26450521/
我正在尝试以一种可以根据需要循环输出和输出 header 的方式编写 SQL Server 2008 查询。我已经多次以错误的方式完成这些工作,让 ColdFusion 在页面中完成艰苦的工作,但需要
为什么通用寄存器按原样排序(eax、ecx、edx、ebx)?例如,对于“inc”指令,操作码是: inc eax - 40 inc ecx - 41 inc edx - 42 inc ebx - 4
晚上好。 我需要组织一个HashMap,它是这样的: private HashMap>taskOrdered = new HashMap>(); 每个“任务”都是一个具有 3 个属性的类: 人员姓名;
这个问题在这里已经有了答案: C# - sorting by a property (3 个答案) how to sort a collection by datetime in c# (4 个答案
我有以下存储记录和错误的 XML 文档(如果需要可以重新设计)。 11/03/2010 14:12:41 1 11/03/2
关于这个主题有很多问题,仍然无法找到解决这个问题的方法。 我正在做的查询是: SELECT `b`.`ads_id` AS `ads_id`, `b`.`bod_bedrag`
我正在制作一个排名系统。但我想要的是将我得到的结果 ($kn) 从最高到最低排序。我该怎么做? include "includes/core.inc.php"; require "includes/c
我有一个这样的列表, M=[[75], [95, 64], [17, 47, 82], [18, 35, 87, 10], [20, 4, 82, 47, 65], [19, 1, 23, 75, 3
我有 5 个数学问题要解决,30 个人将尝试解决这些问题中的每一个。 我知道每个人解决某个问题的速度有多快: Person1 将能够在 5 秒内解决问题 A,在 7 秒内解决问题 B,在 20 秒内解
考虑 val animals = List("penguin","ferret","cat").toSeq val rdd = sc.makeRDD(animals, 1) 我想订购这个 RDD。我是
我有以下列表: 我的 list [[1]] [1] 11 [[2]] [1] 9 [[3]] [1] 10 我想对它进行排序。我试过了 sort(mylist) Error: mylist must
sort(v; alg::Algorithm=defalg(v), lt=isless, by=identity, rev::Bool=false, order::Ordering=Forward)
我有一个数据框,我想生成一个 geom_tile()从中绘制,但我希望图形的排序不是基于字母顺序而是基于此数据框中的变量。 structure(list(V1 = c("a", "y", "w", "
我列出 Facebook 好友是这样的: FB.api('/me/friends?fields=id,name,updated_time&date_format=U&',
我正在多个线程中的UDP上接收消息。每次接待后,我都会提出MessageReceived.OnNext(message)。 因为我使用多个线程,所以消息无序引发,这是一个问题。 如何通过消息计数器命令
我与两个实体有一对多关系: Order: int OrderId string OrderNumber ... OrderItem: int ItemId int sequence decimal Q
我正在尝试从用户将输入的数字(代码)中对结构进行排序,并且我正在使用冒泡排序。我希望程序打印按数字(代码)排序的所有数据,但它只对数字(代码)进行排序。有人可以帮我对数字(代码)中的其他元素进行排序吗
对于我的通用网格,我目前这样做是为了激活排序: Elements.OrderBy(column.SortExpression).AsQueryable(); 其中SortExpression类型为 F
给定两个 DOM 元素,比如 a 和 b,我们如何确定哪个在文档中排在第一位? 我正在对一组元素实现拖放操作。并且可以以任意顺序选择元素,但是在拖动它们时,需要以“正确”的顺序移动这些元素。 最佳答案
我正在将我们的管理系统从 PHP 迁移到 Ruby On Rails。旧系统的一部分使用 SQL 来构建客户列表,然后按最后一次联系的时间对他们进行排序: SELECT `cust
我是一名优秀的程序员,十分优秀!