gpt4 book ai didi

apache-kafka - 如何用camel-kafka手动控制offset commit?

转载 作者:行者123 更新时间:2023-12-02 17:16:50 24 4
gpt4 key购买 nike

我正在使用 camel kafka 组件,我不清楚在提交偏移量时发生了什么。如下所示,我正在聚合记录,我认为对于我的用例,只有在记录保存到 SFTP 后提交偏移量才有意义。

是否可以手动控制何时可以执行提交?

private static class MyRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {

from("kafka:{{mh.topic}}?" + getKafkaConfigString())
.unmarshal().string()
.aggregate(constant(true), new MyAggregationStrategy())
.completionSize(1000)
.completionTimeout(1000)
.setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime())
.to("sftp://" + getSftpConfigString())

// how to commit offset only after saving messages to SFTP?

;
}

private final class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody(body);
return oldExchange;
}
}
}

private static String getSftpConfigString() {
return "{{sftp.hostname}}/{{sftp.dir}}?"
+ "username={{sftp.username}}"
+ "&password={{sftp.password}}"
+ "&tempPrefix=.temp."
+ "&fileExist=Append"
;
}

private static String getKafkaConfigString() {
return "brokers={{mh.brokers}}"
+ "&saslMechanism={{mh.saslMechanism}}"
+ "&securityProtocol={{mh.securityProtocol}}"
+ "&sslProtocol={{mh.sslProtocol}}"
+ "&sslEnabledProtocols={{mh.sslEnabledProtocols}}"
+ "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}"
+ "&saslJaasConfig={{mh.saslJaasConfig}}"
+ "&groupId={{mh.groupId}}"
;
}

最佳答案

不,你不能。 Kafka 每 X 秒在后台执行一次自动提交(您可以配置它)。

camel-kafka 不支持手动提交。这也是不可能的,因为聚合器与 kafka 消费者是分开的,而它是执行提交的消费者。

关于apache-kafka - 如何用camel-kafka手动控制offset commit?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45947180/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com