gpt4 book ai didi

java - Apache Kafka的 Spring :具有异步请求,批处理和最大运行中1的KafkaTemplate行为

转载 作者:行者123 更新时间:2023-12-01 23:49:51 24 4
gpt4 key购买 nike

场景/用例:
我有一个Spring Boot应用程序,使用Spring for Kafka将消息发送到Kafka主题。在完成特定事件(通过http请求触发)后,将创建一个新线程(通过Spring @Async),该线程调用kafkatemplate.send()并具有返回的ListenableFuture回调。处理http请求的原始线程将响应返回给调用客户端,并被释放。

正常行为:
在正常的应用程序负载下,我已经验证了各个消息是否均已按期望发布到该主题(在回调成功或失败以及在kafka集群上的主题中查看消息后,应用程序日志条目)。如果我关闭所有Kafka代理3-5分钟,然后使群集恢复联机,则应用程序的发布者会立即重新建立与kafka的连接并继续发布消息。

问题行为:
但是,在执行负载测试时,如果我关闭所有kafka代理3-5分钟,然后使集群恢复联机,则Spring应用程序的发布者将继续显示所有发布尝试的失败。此过程持续约7个小时,此时发布者可以再次成功与kafka重新建立通信(通常是管道异常中断造成的,但并非总是如此)。

最新发现:
在执行负载测试时,大约10分钟后,我使用JConsole连接到应用程序,并监视了通过kafka.producer公开的producer metrics。在第一个约内30秒的重载后,缓冲区可用字节继续减少,直到达到0并保持为0。等待线程保持在6到10之间(每次我刷新时交替出现),缓冲区可用字节保持在0左右。 6.5小时在该缓冲区可用字节显示所有最初分配的内存已还原后,但kafka发布尝试继续失败约。再过30分钟,最终才恢复所需的行为。

当前生产者配置

request.timeout.ms=3000
max.retry.count=2
max.inflight.requests=1
max.block.ms=10000
retry.backoff.ms=3000


所有其他属性都使用 their default values

问题:


考虑到我的用例,更改batch.size或linger.ms对消除重载时遇到的问题有任何积极影响?
假设我有单独的线程,所有调用kafkatemplate.send()的消息和回调都分开,并且我将max.in.flight.requests.per.connection设置为1,则忽略了batch.size和linger.ms,超出了每个线程的大小限制信息?我的理解是,在这种情况下,实际上没有进行批处理,并且每个消息都是作为单独的请求发送的。
鉴于我将max.block.ms设置为10秒,为什么缓冲存储器仍然使用了这么长时间,以及为什么所有消息仍然无法发布这么长时间?我的理解是,在10秒钟后,每次新的发布尝试都将失败并返回失败回调,从而释放关联的线程


更新:
尝试阐明线程用法。我正在使用JavaDocs中推荐的单个生产者实例。有诸如https-jsse-nio-22443-exec- *之类的线程正在处理传入的https请求。当请求到来时,将进行一些处理,并且一旦所有与非Kafka相关的逻辑都完成后,将调用另一个装饰有@Async的类中的方法。该方法调用kafkatemplate.send()。在执行发布到kafka之前,日志中会显示返回给客户端的响应(这是Im通过单独的线程来验证其响应的方式,因为服务在返回响应之前不会等待发布)。
有一些task-scheduler- *线程似乎正在处理来自kafkatemplate.send()的回调。我的猜测是,单个kafka-producer-network-thread可以处理所有发布。

最佳答案

我的应用程序发出一个http请求,并在每次kafka发布失败时将每条消息发送到数据库平台上的死信表。用来执行发布到kafka的相同线程被重新用于此对数据库的调用。我将数据库调用逻辑移到另一个类中,并用其自己的@Async和自定义TaskExecutor装饰了它。完成此操作后,我监视了JConsole,可以看到对Kafka的调用似乎正在重新使用相同的10个线程(TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80),并且对数据库服务的调用现在正在使用单独的线程池(TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80),该线程池始终一致关闭和打开新线程,但保持相对恒定的线程数。通过这种新行为,缓冲区可用字节保持在正常的恒定水平,并且一​​旦代理重新联机,应用程序的kafka发布者就可以成功地重新建立连接。

关于java - Apache Kafka的 Spring :具有异步请求,批处理和最大运行中1的KafkaTemplate行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58223741/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com