gpt4 book ai didi

java - 如何使用Apache Kafka实现 "Exactly once"kafka消费者?

转载 作者:行者123 更新时间:2023-12-01 16:54:18 24 4
gpt4 key购买 nike

我们有一个应用程序,它使用来自 Kafka 主题(3 个分区)的消息并丰富数据并将记录保存在数据库(Spring JPA)中,然后将消息发布到另一个 Kafka 主题(在同一个代理上)以及所有这是使用 Camel 2.4.1 和 Spring Boot 2.1.7.RELEASE 精心策划的。

我们希望为 kafka 消费者-生产者组合实现“恰好一次”语义。

消费者设置:

   autoOffsetReset: earliest
autoCommitEnable: false
allowManualCommit: true
breakOnFirstError: true
group.id : CONSUMER.GROUP.ID
count: 3
max.poll.records = 1 # rollback when message processing fails.

生产者设置:

   idempotence: true
transactionIdPrefix: txn-prefix-id

Bean 接线:

   @Bean
SpringTransactionPolicy springTransactionPolicy() throws Exception {
SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
txRequired.setTransactionManager(transactionManager());
txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return txRequired;
}

@Bean
public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
kafkaConfigs());
// enable transaction manager
defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
return defaultKafkaProducerFactory;
}


@Bean
@Primary
public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager(),jpaTransactionManager());
}

@Bean
public PlatformTransactionManager kafkaTransactionManager() {
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
kafkaTransactionManager.setRollbackOnCommitFailure(true);
return kafkaTransactionManager;
}

@Bean
JpaTransactionManager jpaTransactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setRollbackOnCommitFailure(true);
return transactionManager;
}

Camel 路线:

public RoutesBuilder inboundRoute() {
return new RouteBuilder() {

@Override
public void configure() throws Exception {
//Common error handler
onException(UnsupportedMessageTypeException.class).
maximumRedeliveries(redeliveryCount).
handled(true).
bean(ExceptionPropagatorProcessor.class, "process").
bean(manualCommitProcessor).
end();

onException(AppRuntimeException.class).
maximumRedeliveries(redeliveryCount).
bean(ExceptionPropagatorProcessor.class, "process")
end();

onException(RetryExhaustedException.class).
maximumRedeliveries(0).// No retry for this exception
handled(true).
bean(ExceptionPropagatorProcessor.class, "process").
bean(kafkaManualCommitProcessor).
end();

from("kafka:inboundTopic").
routeId("consume-msg").
transacted("springTransactionPolicy").
bean(transactionBeginProcessor).
//check if this is a retry scenario, the max retry count reached then throw RetryExhaustedException.
bean(retryEvaluationProcessor).
bean(enrichProcessor). // publish kafka messages
bean(persistenceProcessor).
bean(transactionEndProcessor). // publish kafka messages
bean(manualCommitProcessor);

但是当存在异常处理场景时,我们无法让 kafka 生产者提交消息。我缺少什么,正确的方法是什么?

最佳答案

您似乎正在使用 Spring Kafka,并且他们的 KafkaTransactionManager 不是真正支持 XA 的事务管理器(有关限制,请参阅他们的文档),因此您不能使用它来回滚 Kafka 和 JDBC 数据库等。

camel-kafka 还不支持 Kafka 事务。我已经创建了一张票:https://issues.apache.org/jira/browse/CAMEL-15016

关于java - 如何使用Apache Kafka实现 "Exactly once"kafka消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61621755/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com