- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我阅读了大量 Gary Russell 的答案和帖子,但没有找到同步以下序列的常见用例的实际解决方案:
recieve from topic A => save to DB via Spring-data => send to topic B
据我正确理解:在这种情况下不能保证完全原子处理,我需要在客户端处理消息重复数据删除,但主要问题是ChainedKafkaTransactionManager 不与 JpaTransactionManager 同步
(参见下面的@KafkaListener
)
卡夫卡配置:
@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);
@Bean
public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(MAX_POLL_RECORDS_CONFIG, 10);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
@Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
@Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
@Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
@Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
) {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setTransactionManager(chainedKafkaTM);
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
arbp.setCommitRecovered(true);
arbp.setKafkaTemplate(kafkaTemplate);
factory.setAfterRollbackProcessor(arbp);
factory.setConcurrency(concurrency);
factory.afterPropertiesSet();
return factory;
}
@Bean
public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(BATCH_SIZE_CONFIG, 16384);
configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');
return kafkaProducerFactory;
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
@Bean
public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}
@Bean(name = "transactionManager")
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
return new JpaTransactionManager(em);
}
}
Kafka 监听器:
@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
for (byte[] record : records) {
// cause infinity rollback (perhaps due to batch listener)
if (true)
throw new RuntimeExcetion("foo");
// spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
var result = storageService.persist(record);
kafkaTemplate.send(result);
}
}
Spring-kafka版本:2.3.3Spring-boot版本:2.2.1
实现此类用例的正确方法是什么?Spring-kafka 文档仅限于小型/特定示例。
P.s. 当我使用 @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class)
时上@KafkaListener
然而,我面临着无休止的循环回滚的方法FixedBackOff(1000L, 3L)
已设置。
编辑:我计划通过可配置的重试次数在监听器、生产者和数据库之间实现最大程度的同步。
编辑:上面的代码片段根据建议的配置进行了编辑。使用 ARBP 并不能解决我的无限回滚循环问题,因为第一个语句的谓词始终为 false ( SeekUtils.doSeeks
):
DefaultAfterRollbackProcessor
...
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable) {
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getSkipPredicate((List) records, exception), LOGGER)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
new OffsetAndMetadata(skipped.offset() + 1)));
}
}
值得一提的是,Kafka Consumer 方法中没有 Activity 事务( TransactionSynchronizationManager.isActualTransactionActive()
)。
最佳答案
是什么让您认为它不同步?您确实不需要 @Transactional
因为容器将启动两个事务。
您不应将 SeekToCurrentErrorHandler
与事务一起使用,因为这发生在事务内。相反,配置回滚后处理器。默认 ARBP 使用 FixedBackOff(0L, 9)
(10 次尝试)。
这对我来说效果很好;并在 4 次传送尝试后停止:
@SpringBootApplication
public class So58804826Application {
public static void main(String[] args) {
SpringApplication.run(So58804826Application.class, args);
}
@Bean
public JpaTransactionManager transactionManager() {
return new JpaTransactionManager();
}
@Bean
public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
KafkaTransactionManager<?, ?> kafka) {
kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(kafka, jpa);
}
@Autowired
private Saver saver;
@KafkaListener(id = "so58804826", topics = "so58804826")
public void listen(String in) {
System.out.println("Storing: " + in);
this.saver.save(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58804826")
.partitions(1)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
// template.executeInTransaction(t -> t.send("so58804826", "foo"));
};
}
}
@Component
class ContainerFactoryConfigurer {
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
ChainedKafkaTransactionManager<?, ?> tm) {
factory.getContainerProperties().setTransactionManager(tm);
factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));
}
}
@Component
class Saver {
@Autowired
private MyEntityRepo repo;
private final AtomicInteger ids = new AtomicInteger();
@Transactional("chainedTxM")
public void save(String in) {
this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
throw new RuntimeException("foo");
}
}
我从两个 TxM 中看到“正在参与现有交易”。
使用@Transactional("transactionManager")
,我只是从 JPATm 获取它,正如人们所期望的那样。
编辑
批处理监听器没有“恢复”的概念 - 框架不知道需要跳过批处理中的哪条记录。在 2.3 中,我们为使用手动确认模式时的批处理监听器添加了一项新功能。
Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.
When using a batch listener, you can specify the index within the batch where the failure occurred. When
nack()
is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll(). This is an improvement over the SeekToCurrentBatchErrorHandler, which can only seek the entire batch for redelivery.
但是,失败的记录仍会无限期地重播。
您可以跟踪不断失败的记录并 nack index + 1
以跳过它。
但是,由于您的 JPA tx 已回滚;这对你不起作用。
使用批处理监听器,您必须处理监听器代码中的批处理问题。
关于java - Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58804826/
只是想知道这些结构之间有什么区别(text、data、rodata、bss 等)在链接描述文件中: .data : { *(.data) } .data : { *(.data*) }
Data 定义为其核心功能之一 gfoldl : gfoldl :: (Data a) => (forall d b. Data d => c (d -> b) -> d -> c b)
以下之间有什么区别:data-sly-use、data-sly-resource、data-sly-include 和 数据-sly-模板?我正在阅读 Sightly AEM 上的文档,我非常困惑。
我有一个 Spring Boot、Spring Data JPA (hibernate) Web 应用程序,并且想引入文本搜索功能。 我理解以下内容 hibernate search 或 spring
我不知道我的代码有什么问题。我读了其他有同样问题的人的一些问题,但没有找到答案。当我尝试编译时出现以下错误: ||In function 'main':| |35|error: expected ex
我不太确定为什么会收到此错误或其含义。我的数据框称为“数据”。 library(dplyr) data %>% filter(Info==1, Male==1) %>% lm(CFL_
我一直在 GitHub 等更现代的网站上看到这些属性,它们似乎总是与自定义的弹出窗口一致,如 title 属性。 Option 1 Option 2 Option 3 Option 4 我在 HTML
如何用 iCloud Core Data 替换我现有的 Core Data?这是我的持久商店协调员: lazy var persistentStoreCoordinator: NSPersistent
我一直在 GitHub 等更现代的网站上看到这些属性,它们似乎总是与自定义的弹出窗口一致,如 title 属性。 Option 1 Option 2 Option 3 Option 4 我在 HTML
我正在通过 this project 在 Android 上摆弄 node.js ,我需要一种方法将 js 文件部署到私有(private)目录(以隐藏源代码,防止用户篡改),该目录也物理存在于文件系
大家好我有点沮丧,所以我希望得到一些帮助。我的项目在 SwiftUI 中。我想使用图像选择器将图像保存到 Core Data。我实现了让 ImagePicker 工作,但我正在努力转换 Image -
我有以下数据和代码: mydf grp categ condition value 1 A X P 2 2 B X P 5
我一直在努力解决这个问题,但我根本找不到任何解决问题的方法。希望这里有人可以提供帮助。 我正在尝试为具有以下结构的某些数据创建个人选择矩阵: # A tibble: 2,152 x 32 a
我了解 Data.Map.Lazy 和 Data.Map.Strict 是不同的。但是,当您导入 Data.Map 时,您究竟导入了什么:严格的、惰性的还是两者的组合? 最佳答案 懒人。看着docs
我正在开发一个 C 程序,用于从 BerkeleyDB DBTree 数据库中提取数据值与特定模式匹配的记录。我创建数据库,打开它,将键的 DBT 和数据的另一个 DBT 清零,将 DBT 标志设置为
所以我有以下成员(member)历史表 User_ID | Start date | End Date | Type(0-7) | ---------------------------
随着最近推出的包dataframe ,我认为是时候正确地对各种数据结构进行基准测试,并突出每种数据结构的优势。我不是每个人的不同优势的专家,所以我的问题是,我们应该如何对它们进行基准测试。 我尝试过的
我有来自 API 的数据,但无法将数组中的数据设置为 vue.js 中的 this.data这是来自 API 的数据(JSON) 你能告诉我这个语法吗 {"id":1613, "name_org":"
在 Vue.js到目前为止,我已经找到了两种定义数据的方法:data: {} 和 data() { return; }. data: { defaultLayout: 'default' }
我正在研究Spring Data Rest Services,并在自定义拦截器中遇到一些问题。之前我使用spring-data-rest-webmvc 2.2.0并以以下方式添加了拦截器。 publi
我是一名优秀的程序员,十分优秀!