gpt4 book ai didi

apache-kafka - Kafka Exactly once 与事务生产者

转载 作者:行者123 更新时间:2023-12-04 15:55:49 25 4
gpt4 key购买 nike

我正在尝试使用事务性生产者/消费者完全理解 Kafka。

我遇到了下面的例子。但是,我仍然很难理解一次。这个代码正确吗?

producer.sendOffsetsToTransaction - What this code does? Should this be done to the same target topic?

什么是系统在 consumer.commitSync() 之前崩溃;//相同的消息会被再次读取并产生重复的消息?

public class ExactlyOnceLowLevel {

public void runConsumer() throws Exception {
final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
final Producer<Long, String> producer = createProducer();

producer.initTransactions();

consumer.subscribe(Collections.singletonList(TOPIC));

while (true) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

try {
final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
producer.beginTransaction();
for (final ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
record.offset(), record.key(), record.value());

final ProducerRecord<Long, String> producerRecord =
new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
// send returns Future
final RecordMetadata metadata = producer.send(producerRecord).get();
currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
}
producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
producer.commitTransaction();
consumer.commitSync();
currentOffsets.clear();
// EXACTLY ONCE!
}
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
e.printStackTrace();
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
}
catch (final KafkaException e) {
e.printStackTrace();
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
finally {
producer.flush();
producer.close();
}
}
}

private static KafkaConsumer<byte[], byte[]> createConsumer() {
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be

return new KafkaConsumer<>(consumerConfig);
}

private static Producer<Long, String> createProducer() {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1

return new KafkaProducer<>(props);
}

public static void main(final String... args) throws Exception {

final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
example.runConsumer();

}
}

最佳答案

在 Kafka 事务中使用读/处理/写模式时,您不应尝试向消费者提交偏移量。正如您所暗示的那样,这可能会导致问题。

在此用例中,需要将偏移量添加到事务中,您应该只使用 sendOffsetsToTransaction() 来执行此操作。该方法确保仅在完整事务成功时才提交这些偏移量。查看Javadoc :

Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.

This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. Thus, the specified consumerGroupId should be the same as config parameter group.id of the used consumer. Note, that the consumer should have enable.auto.commit=false and should also not commit offsets manually (via sync or async commits).

关于apache-kafka - Kafka Exactly once 与事务生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51739191/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com