gpt4 book ai didi

java - 使用 KafkaTransactionManager 在事务 KafkaTemplate 中进行基于事件的提交

转载 作者:行者123 更新时间:2023-12-02 10:48:46 24 4
gpt4 key购买 nike

Spring管理的KafkaTemplate提供

template.send(record).addCallback(...
template.executeInTransaction(...

现在假设我有一个方法 doWork(),它是在事件(例如 TCP/IP 消息)上触发的。

@Autowired
KafkaTemplate template;

// This method is triggered on a event
doWork(EventType event){
switch(event){
case Events.Type1 :
template.send(record); break;
case Events.Type2 :
// Question : How do I achieve a commit of all my previous sends here?
default : break;
}
}

基本上,我需要通过在 doWork() 上添加 @Transaction 或

template.executeInTransaction(...

在代码中。但我想批量处理几个 [template.send()] 并在几次调用 doWork() 方法后进行提交,我该如何实现这一点?

我的生产者配置启用了事务,并将 KafkaTransactionManager 连接到生产者工厂。

最佳答案

kafkaTemplate.executeInTransaction(t -> {
boolean stayIntransaction = true;
while (stayInTransaction) {
Event event = readTcp()
doWork(event);
stayInTransaction = transactionDone(event);
}
}

只要doWork()方法使用相同的模板,并且它在回调范围内运行,工作将在事务中运行。

或者

@Transactional
public void doIt() {
boolean stayIntransaction = true;
while (stayInTransaction) {
Event event = readTcp()
doWork(event);
stayInTransaction = transactionDone(event);
}
}

使用声明式事务时。

如果 TCP 事件是异步的,您将需要以某种方式将它们交给运行事务的线程,例如使用 BlockingQueue<?> .

关于java - 使用 KafkaTransactionManager 在事务 KafkaTemplate 中进行基于事件的提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52338681/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com