gpt4 book ai didi

java - spring-kafka 使用非常长的任务多次处理同一条消息。

转载 作者:行者123 更新时间:2023-11-29 08:28:27 25 4
gpt4 key购买 nike

这个问题已经被询问和解决了几次,但由于我的知识非常有限,我无法找到我的问题的答案。

我有一个生产者向消费者发送工作任务,该任务大约需要两个小时才能完成。我需要任务只执行一次,但是它完成然后一遍又一遍地重新开始。

我发现我的日志中最有帮助的是

2018-05-15 15:18:23.731  WARN 6888 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=anonymous.1ae85859-db41-4dc2-a7e2-ab4268256e00] Synchronous auto-commit of offsets {consumer-message-0=OffsetAndMetadata{offset=34, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

这让我认为简单地将消费者任务包装在一个线程中就可以解决问题,但事实并非如此。

我的消费者的一些代码

@Component
@Slf4j
public class KafkaConsumer {

private final CommandRunnerService commandRunnerService;

public KafkaConsumer(CommandRunnerService commandRunnerService) {
this.commandRunnerService = commandRunnerService;
}

@StreamListener(KafkaStreams.INPUT)
public void handleWorkUnit(@Payload Steak steak) {
commandRunnerService.executeCreateSteak(steak);
}
}

handleWorkUnit 需要几个小时才能完成。所以我的修复尝试是

    @StreamListener(KafkaStreams.INPUT)
public void handleWorkUnit(@Payload Steak steak) {
Runnable task = () -> commandRunnerService.executeCreateSteak(steak);
task.run();
}

这没什么区别。

我使用的是开箱即​​用的配置,仅针对消费者设置了非常基础的配置

spring:
application:
cloud:
stream:
kafka:
binder:
brokers: 192.168.0.100
bindings:
consumer-message:
destination: consumer-message
contentType: application/json
consumer-response:
destination: consumer-response
contentType: application/json

以及我正在使用的版本:

ext {
springCloudVersion = 'Finchley.RC1'
}

dependencies {
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
compile('org.springframework.kafka:spring-kafka')
}

如上所述,我在文档和 SO 上看到了许多复杂的示例,但我希望有一个简单的配置修复?或者一些更“初学者”友好的例子。

干杯,

最佳答案

请尝试修复您的代码,如下所示。

@StreamListener(KafkaStreams.INPUT)
public void handleWorkUnit(@Payload Steak steak) {
Runnable task = () -> commandRunnerService.executeCreateSteak(steak);
new Thread(task).start();
}

在您的代码中,您没有创建任何线程。您的代码只是调用 Runnablerun 方法。

相关属性是消费者的max.poll.interval.ms及其默认值是5 分钟。如果您在此期间不调用 poll() 方法,您的经纪人会认为您的消费者失败了。可能这就是你失败的原因(重新平衡和分配)

关于java - spring-kafka 使用非常长的任务多次处理同一条消息。,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50341963/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com