gpt4 book ai didi

c# - .net Rx : in-order batch-processing of messages

转载 作者:行者123 更新时间:2023-11-30 15:09:04 24 4
gpt4 key购买 nike

我正在尝试使用 Rx 实现异步工作流,但我似乎完全错了。

我想做的是:

From an undefined asynchronous stream of un-parsed message strings (i.e. an IObservable<string>)
parse the message strings asynchronously, but preserve their order. (IObservable<Message>)
Batch up parsed Messages in groups of 100 or so (IObservable<IEnumerable<Message>>)
Send each batch, when complete, to the UI thread to be processed. Batches must arrive in the same order they were started.

我似乎无法获得顺序保留,而且 Rx 似乎也没有像我期望的那样异步执行操作。

我尝试通过使用 IEnumerable 而不是 IObservable 来保存顺序,然后在其上调用 .AsParallel().AsOrdered() 运算符。这是代码。请参阅下面的注释以了解我遇到的问题:

    private IObservable<IEnumerable<Message>> messageSource;
public IObservable<IEnumerable<Message>> MessageSource { get { return messageSource; } }

/// <summary>
/// Sub-classes of MessageProviderBase provide this IEnumerable to
/// generate unparsed message strings synchronously
/// </summary>
protected abstract IEnumerable<string> UnparsedMessages { get; }

public MessageProviderBase()
{
// individual parsed messages as a PLINQ query
var parsedMessages = from unparsedMessage in UnparsedMessages.AsParallel().AsOrdered()
select ParseMessage(unparsedMessage);

// convert the above PLINQ query to an observable, buffering up to 100 messages at a time
var batchedMessages
= parsedMessages.ToObservable().BufferWithTimeOrCount(TimeSpan.FromMilliseconds(200), 100);

// ISSUE #1:
// batchedMessages seems to call OnNext before all of the messages in its buffer are parsed.
// If you convert the IObservable<Message> it generates to an enumerable, it blocks
// when you try to enumerate it.

// Convert each batch to an IEnumerable
// ISSUE #2: Even if the following Rx query were to run asynchronously (it doesn't now, see the above comment),
// it could still deliver messages out of order. Only, instead of delivering individual
// messages out of order, the message batches themselves could arrive out of order.
messageSource = from messageBatch in batchedMessages
select messageBatch.ToEnumerable().ToList();
}

最佳答案

我下面的回答有点基于 Enigmativity 的代码,但修复了一些与完成相关的竞争条件,还增加了对取消和自定义调度程序的支持(这将使单元测试变得更加容易)。

public static IObservable<U> Fork<T, U>(this IObservable<T> source,
Func<T, U> selector)
{
return source.Fork<T, U>(selector, Scheduler.TaskPool);
}

public static IObservable<U> Fork<T, U>(this IObservable<T> source,
Func<T, U> selector, IScheduler scheduler)
{
return Observable.CreateWithDisposable<U>(observer =>
{
var runningTasks = new CompositeDisposable();

var lockGate = new object();
var queue = new Queue<ForkTask<U>>();
var completing = false;
var subscription = new MutableDisposable();

Action<Exception> onError = ex =>
{
lock(lockGate)
{
queue.Clear();
observer.OnError(ex);
}
};

Action dequeue = () =>
{
lock (lockGate)
{
var error = false;
while (queue.Count > 0 && queue.Peek().Completed)
{
var task = queue.Dequeue();
observer.OnNext(task.Value);
}
if (completing && queue.Count == 0)
{
observer.OnCompleted();
}
}
};

Action onCompleted = () =>
{
lock (lockGate)
{
completing = true;
dequeue();
}
};

Action<T> enqueue = t =>
{
var cancellation = new MutableDisposable();
var task = new ForkTask<U>();

lock(lockGate)
{
runningTasks.Add(cancellation);
queue.Enqueue(task);
}

cancellation.Disposable = scheduler.Schedule(() =>
{
try
{
task.Value = selector(t);

lock(lockGate)
{
task.Completed = true;
runningTasks.Remove(cancellation);
dequeue();
}
}
catch(Exception ex)
{
onError(ex);
}
});
};

return new CompositeDisposable(runningTasks,
source.AsObservable().Subscribe(
t => { enqueue(t); },
x => { onError(x); },
() => { onCompleted(); }
));
});
}

private class ForkTask<T>
{
public T Value = default(T);
public bool Completed = false;
}

这里有一个随机化任务执行时间的例子来测试它:

AutoResetEvent are = new AutoResetEvent(false);

Random rand = new Random();

Observable.Range(0, 5)
.Fork(i =>
{
int delay = rand.Next(50, 500);
Thread.Sleep(delay);

return i + 1;
})
.Subscribe(
i => Console.WriteLine(i),
() => are.Set()
);

are.WaitOne();

Console.ReadLine();

关于c# - .net Rx : in-order batch-processing of messages,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4867703/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com