gpt4 book ai didi

java - Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步

转载 作者:行者123 更新时间:2023-12-02 01:15:03 24 4
gpt4 key购买 nike

我阅读了大量 Gary Russell 的答案和帖子,但没有找到同步以下序列的常见用例的实际解决方案:

recieve from topic A => save to DB via Spring-data => send to topic B

据我正确理解:在这种情况下不能保证完全原子处理,我需要在客户端处理消息重复数据删除,但主要问题是ChainedKafkaTransactionManager 不与 JpaTransactionManager 同步 (参见下面的@KafkaListener)

卡夫卡配置:

@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig {

private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);

@Bean
public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {

Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(MAX_POLL_RECORDS_CONFIG, 10);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');

props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
@Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
@Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
@Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
@Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
) {

ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setTransactionManager(chainedKafkaTM);

factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
arbp.setCommitRecovered(true);
arbp.setKafkaTemplate(kafkaTemplate);

factory.setAfterRollbackProcessor(arbp);
factory.setConcurrency(concurrency);

factory.afterPropertiesSet();

return factory;
}

@Bean
public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {

Map<String, Object> configProps = new HashMap<>();

configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

configProps.put(BATCH_SIZE_CONFIG, 16384);
configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);

configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');

return kafkaProducerFactory;
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}

@Bean
public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}

@Bean(name = "transactionManager")
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
return new JpaTransactionManager(em);
}
}

Kafka 监听器:

@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
for (byte[] record : records) {
// cause infinity rollback (perhaps due to batch listener)
if (true)
throw new RuntimeExcetion("foo");

// spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
var result = storageService.persist(record);

kafkaTemplate.send(result);
}
}

Spring-kafka版本:2.3.3Spring-boot版本:2.2.1

实现此类用例的正确方法是什么?Spring-kafka 文档仅限于小型/特定示例。

P.s. 当我使用 @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class) 时上@KafkaListener然而,我面临着无休止的循环回滚的方法FixedBackOff(1000L, 3L)已设置。

编辑:我计划通过可配置的重试次数在监听器、生产者和数据库之间实现最大程度的同步。

编辑:上面的代码片段根据建议的配置进行了编辑。使用 ARBP 并不能解决我的无限回滚循环问题,因为第一个语句的谓词始终为 false ( SeekUtils.doSeeks ):

DefaultAfterRollbackProcessor
...
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable) {

if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getSkipPredicate((List) records, exception), LOGGER)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
new OffsetAndMetadata(skipped.offset() + 1)));
}
}

值得一提的是,Kafka Consumer 方法中没有 Activity 事务( TransactionSynchronizationManager.isActualTransactionActive() )。

最佳答案

是什么让您认为它不同步?您确实不需要 @Transactional 因为容器将启动两个事务。

您不应将 SeekToCurrentErrorHandler 与事务一起使用,因为这发生在事务内。相反,配置回滚后处理器。默认 ARBP 使用 FixedBackOff(0L, 9)(10 次尝试)。

这对我来说效果很好;并在 4 次传送尝试后停止:

@SpringBootApplication
public class So58804826Application {

public static void main(String[] args) {
SpringApplication.run(So58804826Application.class, args);
}

@Bean
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}


@Bean
public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
KafkaTransactionManager<?, ?> kafka) {

kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(kafka, jpa);
}

@Autowired
private Saver saver;

@KafkaListener(id = "so58804826", topics = "so58804826")
public void listen(String in) {
System.out.println("Storing: " + in);
this.saver.save(in);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("so58804826")
.partitions(1)
.replicas(1)
.build();
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
// template.executeInTransaction(t -> t.send("so58804826", "foo"));
};
}

}

@Component
class ContainerFactoryConfigurer {

ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
ChainedKafkaTransactionManager<?, ?> tm) {

factory.getContainerProperties().setTransactionManager(tm);
factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));
}

}

@Component
class Saver {

@Autowired
private MyEntityRepo repo;

private final AtomicInteger ids = new AtomicInteger();

@Transactional("chainedTxM")
public void save(String in) {
this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
throw new RuntimeException("foo");
}

}

我从两个 TxM 中看到“正在参与现有交易”。

使用@Transactional("transactionManager"),我只是从 JPATm 获取它,正如人们所期望的那样。

编辑

批处理监听器没有“恢复”的概念 - 框架不知道需要跳过批处理中的哪条记录。在 2.3 中,我们为使用手动确认模式时的批处理监听器添加了一项新功能。

参见Committing Offsets .

Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.

When using a batch listener, you can specify the index within the batch where the failure occurred. When nack() is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll(). This is an improvement over the SeekToCurrentBatchErrorHandler, which can only seek the entire batch for redelivery.

但是,失败的记录仍会无限期地重播。

您可以跟踪不断失败的记录并 nack index + 1 以跳过它。

但是,由于您的 JPA tx 已回滚;这对你不起作用。

使用批处理监听器,您必须处理监听器代码中的批处理问题。

关于java - Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58804826/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com