gpt4 book ai didi

java - 可观察到像 Lmax Disruptor 这样的批处理

转载 作者:行者123 更新时间:2023-12-03 16:25:32 28 4
gpt4 key购买 nike

熟悉的人lmax ring buffer (disruptor)知道该数据结构的最大优点之一是它可以对传入事件进行批处理,当我们有一个消费者可以利用批处理使系统自动适应负载时,你抛给它的事件越多越好。

我想知道我们不能用 Observable 实现相同的效果(针对批处理功能)。我试过了 Observable.buffer但这是非常不同的,缓冲区将等待并且不会在预期数量的事件未到达时发出批处理。我们想要的完全不同。

鉴于订阅者正在等待来自 Observable<Collection<Event>> 的批处理,当单个项目到达流时,它会发出单个元素批处理,该批处理由订阅者处理,而它正在处理其他元素到达并收集到下一批中,一旦订阅者完成执行,它就会获得下一批作为自上次开始处理以来已经到达的许多事件......

因此,如果我们的订阅者足够快,可以一次处理一个事件,它就会这样做,如果负载变高,它仍将具有相同的处理频率,但每次都会处理更多事件(从而解决背压问题)。 .. 不像缓冲区会粘住并等待批处理填满。

有什么建议吗?还是我应该使用环形缓冲区?

最佳答案

RxJava 和 Disruptor 代表了两种不同的编程方法。

我没有使用 Disruptor 的经验,但根据视频谈话,它基本上是一个大缓冲区,生产者像消防水管一样发出数据,消费者旋转/产生/阻止直到数据可用。

另一方面,RxJava 的目标是非阻塞事件传递。我们也有环形缓冲区,特别是在 observeOn 中,它充当生产者和消费者之间的异步边界,但它们要小得多,我们通过应用协程方法来避免缓冲区溢出和缓冲区膨胀。协同例程归结为发送到您的回调的回调,因此您可以回调我们的回调以按照您的节奏向您发送一些数据。此类请求的频率决定了节奏。

有些数据源不支持此类合作流式传输,需要 onBackpressureXXX 之一如果下游请求速度不够快,将缓冲/丢弃值的运算符。

如果您认为批量处理数据比逐个处理数据更有效,您可以使用 buffer具有重载以指定缓冲区持续时间的运算符:例如,您可以拥有 10 毫秒的数据,而与在此持续时间内到达的值无关。

通过请求频率控制批量大小很棘手,可能会产生无法预料的后果。一般来说,问题是如果你 request(n)从批处理源中,您表示可以处理 n 个元素,但源现在必须创建 n 个大小为 1 的缓冲区(因为类型是 Observable<List<T>> )。相反,如果没有请求被调用,运算符缓冲数据导致更长的缓冲区。这些行为会在处理过程中引入额外的开销,如果您真的可以跟上并且还必须将冷源变成消防水管(因为否则您所拥有的基本上是 buffer(1) ),这本身现在会导致缓冲区膨胀。

关于java - 可观察到像 Lmax Disruptor 这样的批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33609107/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com