gpt4 book ai didi

c# - Rx 运算符到不同的序列

转载 作者:可可西里 更新时间:2023-11-01 09:11:59 25 4
gpt4 key购买 nike

重要:有关结果的描述和更多详细信息,请同时查看我的回答

我需要对通常被复制的一系列对象/事件进行分组和过滤,并使用 TimeSpan 间隔对它们进行缓冲。我尝试用一​​些大理石图更好地解释它:

X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z

会产生

X---Y---Z---X---Y---Z

其中 X、Y 和 Z 是不同的事件类型,'---' 表示间隔。此外,我还想通过一个关键属性来区分它在所有类型上都可用,因为它们有一个共同的基类:

X, Y, Z : A

并且A包含一个属性Key。使用符号 X.a 表示 X.Key = a,最终示例将是:

X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c

会产生

X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b

任何人都可以帮助我将所需的 Linq 运算符(可能是 DistinctUntilChanged 和 Buffer)放在一起来实现此行为吗?谢谢

更新 18.08.12:

应要求,我尽量给出更好的解释。我们有设备收集事件并将其发送到网络服务。这些设备有一个旧的逻辑(由于向后兼容,我们无法更改它)并且它们会不断发送事件,直到它们收到确认;确认后,他们发送队列中的下一个事件,依此类推。事件包含单元的网络地址和一些其他属性,用于区分每个设备队列中的事件。一个事件看起来像这样:

class Event
{
public string NetworkAddress { get; }

public string EventCode { get; }

public string AdditionalAttribute { get; }
}

目标是每 5 秒处理一次从所有设备接收到的可区分事件,将信息存储在数据库中(这就是我们不想分批处理的原因)并将确认发送到设备。让我们举一个只有两个设备和一些事件的例子:

Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'

Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'

Pn are the operations done by our server, explained later

可能的弹珠图(输入流+输出流):

Device 'a'          : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b' : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...

Time : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-

P1: Server stores and acknowledges [a1] and [b1]
P2: " " " " [b2]
P3: " " " " [a2] and [b3]
P4: " " " " [a3] and [b4]

最后我认为它可能是基本运算符的简单组合,但我是 Rx 的新手并且我有点困惑,因为似乎有很多运算符(或运算符的组合)来获得相同的输出流。

更新 19.08.12:

请记住,此代码在服务器上运行,并且应该运行数天而不会出现内存泄漏...我不确定对象的行为。目前,对于每个事件,我都在服务上调用推送操作,该服务调用主题的 OnNext,我应该在其上构建查询(如果我对主题的使用没记错的话)。

更新 20.08.12:

当前实现,包括验证测试;这是我尝试过的,它似乎与@yamen 的建议相同

public interface IEventService
{
// Persists the events
void Add(IEnumerable<Event> events);
}

public class Event
{
public string Description { get; set; }
}

/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);

private readonly Subject<EventMessage> subject = new Subject<EventMessage>();

private readonly IDisposable subscription;

private readonly object locker = new object();

private readonly IEventService eventService;

/// <summary>
/// Initializes a new instance of the <see cref="EventManager"/> class.
/// </summary>
/// <param name="scheduler">The scheduler.</param>
public EventManager(IEventService eventService, IScheduler scheduler)
{
this.eventService = eventService;
this.subscription = this.CreateQuery(scheduler);
}

/// <summary>
/// Pushes the event.
/// </summary>
/// <param name="eventMessage">The event message.</param>
public void PushEvent(EventMessage eventMessage)
{
Contract.Requires(eventMessage != null);
this.subject.OnNext(eventMessage);
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
this.Dispose(true);
}

private void Dispose(bool disposing)
{
if (disposing)
{
// Dispose unmanaged resources
}

this.subject.Dispose();
this.subscription.Dispose();
}

private IDisposable CreateQuery(IScheduler scheduler)
{
var buffered = this.subject
.DistinctUntilChanged(new EventComparer())
.Buffer(EventHandlingPeriod, scheduler);

var query = buffered
.Subscribe(this.HandleEvents);
return query;
}

private void HandleEvents(IList<EventMessage> eventMessages)
{
Contract.Requires(eventMessages != null);
var events = eventMessages.Select(this.SelectEvent);
this.eventService.Add(events);
}

private Event SelectEvent(EventMessage message)
{
return new Event { Description = "evaluated description" };
}

private class EventComparer : IEqualityComparer<EventMessage>
{
public bool Equals(EventMessage x, EventMessage y)
{
return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
}

public int GetHashCode(EventMessage obj)
{
var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
return s.GetHashCode();
}
}
}

public class EventMessage
{
public string NetworkAddress { get; set; }

public byte EventCode { get; set; }

public byte Attribute { get; set; }

// Other properties
}

和测试:

public void PushEventTest()
{
const string Address1 = "A:2.1.1";
const string Address2 = "A:2.1.2";

var eventServiceMock = new Mock<IEventService>();

var scheduler = new TestScheduler();
var target = new EventManager(eventServiceMock.Object, scheduler);
var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
scheduler.Schedule(() => target.PushEvent(eventMessageA1));
scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1));
scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1));

scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);

eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());

scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1));

scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);

eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
}

此外,我再次强调,该软件能够毫无问题地运行数天,处理数以千计的消息,这一点非常重要。明确地说:测试未通过当前实现。

最佳答案

我不确定这是否完全符合您的要求,但您可能会使用 group 显式地对元素进行分组。关键字,然后操纵各种IObservable s 在重新组合之前分开。

例如如果我们有类定义,例如

class A
{
public char Key { get; set; }
}

class X : A { }
...

和一个Subject<A>

Subject<A> subject = new Subject<A>();

然后我们可以写

var buffered =
from a in subject
group a by new { Type = a.GetType(), Key = a.Key } into g
from buffer in g.Buffer(TimeSpan.FromMilliseconds(300))
where buffer.Any()
select new
{
Count = buffer.Count,
Type = buffer.First().GetType().Name,
Key = buffer.First().Key
};

buffered.Do(Console.WriteLine).Subscribe();

我们可以使用您提供的数据对此进行测试:

subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100);
subject.OnNext(new X { Key = 'b' });
Thread.Sleep(100);
subject.OnNext(new X { Key = 'a' });
Thread.Sleep(100);
...
subject.OnCompleted();

获取您提供的输出:

{ Count = 2, Type = X, Key = a }
{ Count = 1, Type = X, Key = b }
{ Count = 1, Type = Y, Key = b }
{ Count = 1, Type = Y, Key = c }
{ Count = 2, Type = Z, Key = a }
{ Count = 2, Type = Z, Key = c }
{ Count = 1, Type = Z, Key = b }

关于c# - Rx 运算符到不同的序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11916038/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com