gpt4 book ai didi

java - Producer#initTransactions 不适用于 KafkaContainer

转载 作者:行者123 更新时间:2023-11-30 01:53:29 24 4
gpt4 key购买 nike

我尝试通过事务向 Kafka 发送消息。所以,我使用这段代码:

 try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) {
producer.initTransactions();
producer.beginTransaction();
Arrays.stream(messages).forEach(
message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
producer.commitTransaction();
}

...

private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) {
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
),
new VoidSerializer(),
new StringSerializer());
}

如果我使用本地 Kafka,效果很好。

但是如果我使用 Kafka TestContainers,它就会卡住在 Producer.initTransactions() 上:

private static final String KAFKA_VERSION = "4.1.1";

@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
.withEmbeddedZookeeper();

如何配置 KafkaContainer 来处理事务?

最佳答案

尝试使用Kafka for JUnit而不是 Kafka 测试容器。我在交易方面也遇到了同样的问题,并通过这种方式让它们活跃起来。

我使用的 Maven 依赖项:

<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>

关于java - Producer#initTransactions 不适用于 KafkaContainer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55260840/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com