- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我一直在尝试使用 Java Flow API 构建应用程序。虽然在发布者和订阅者的速度不同的情况下能够在发布者和订阅者之间执行背压的想法,但我不确定它是否真正有帮助,因为发布者和消费者通常驻留在同一个应用程序中;至少,几乎所有在线示例都是如此。
例如,在我的应用程序中,有一个发布者生成从 RabbitMQ 检索的消息,还有一个订阅者处理这些消息。因此,消息在 RabbitMQ 监听器中提交给发布者,如下所示:
@RabbitListener(queues = "rabbit_queue")
public void rabbitHandler(MessageObject message) {
// do some stuff to the message and then submit it to the publisher
publisher.submit(message);
}
// Then the message will be processed in the subscriber
如果发布者的生成速度快于订阅者的处理速度,则订阅者可以在 subscription.request(n)
上调用一个较小的 n
值。但是,有两件事我不确定我对 request(n)
如何提供帮助的理解是否正确:
request(n)
中的 n
值。但这意味着发布者中的缓冲区大小将很快填满。我可以增加发布者中的缓冲区大小,但我也受到订阅者所面临的相同资源量的限制,因为发布者和订阅者都在使用同一组资源的同一应用程序中。那么拥有发布者和 request()
方法所带来的额外复杂性有什么意义呢?我很确定我对这个过程的理解有问题,因为对我来说这感觉就像是第 22 条军规的情况。就好像我的两只手只能握住这么多球,而我只是在两只手之间传递球并称之为背压。由于发布者和订阅者都受到相同数量的资源的限制,因为它们都在同一个应用程序中,因此当我可以简单地将消息传递给另一个处理程序并受到相同数量的资源的限制时,具有额外的复杂性有什么好处,如下所示:
public class RabbitMqListener {
@RabbitListener(queues = "rabbit_queue")
public void rabbitHandler(MessageObject message) {
// do some stuff to the message and then submit it to the publisher
MessageProcessor.process(message);
}
}
public class MessageProcessor {
public static void process(MessageObject message) {
System.out.println("processing message...");
}
}
如果有人能帮助我纠正我的理解,那就太好了。
最佳答案
“如果 RabbmitMQ 发送消息的速度比发布者的订阅者可以处理的速度快”
那么您应该尝试将背压反馈扩展到消息的源头,即 RabbmitMQ 发布者。为了这个目标,您可以创建额外的点对点连接。如果您无法减慢 RabbmitMQ 发布者的速度,那么您有 2 个选择:删除一些您无法存储的消息,或者购买性能更高的硬件。
关于java - 由于发布者和订阅者驻留在受相同资源限制的同一个应用程序中,那么背压如何帮助解决双方的负载问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59597834/
我是一名优秀的程序员,十分优秀!