gpt4 book ai didi

apache-kafka - Flink Kafka Producer 中的 Exactly-once 语义

转载 作者:行者123 更新时间:2023-12-01 21:55:50 29 4
gpt4 key购买 nike

我正在尝试使用 Kafka Source 和 Sink 测试 Flink exactly-once 语义:

  1. 运行 flink 应用程序,简单地将消息从一个主题传输到另一个主题,并行度 = 1,检查点间隔 20 秒
  2. 每 2 秒使用 Python 脚本生成带有递增整数的消息。
  3. 使用 read_committed 隔离级别的控制台消费者读取输出主题。
  4. 手动终止 TaskManager

无论 TaskManager 是否终止和恢复,我都希望在输出主题中看到单调递增的整数。

但实际上在控制台消费者输出中看到了一些意想不到的东西:

32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

看起来像在输出主题中重放的检查点之间的所有消息。它应该是正确的行为还是我做错了什么?

恢复了一个快照: Flink UI

我的 Flink 代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

Properties producerProperty = new Properties();
producerProperty.setProperty("bootstrap.servers", ...);
producerProperty.setProperty("zookeeper.connect", ...);
producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

Properties consumerProperty = new Properties();
consumerProperty.setProperty("bootstrap.servers", ...);
consumerProperty.setProperty("zookeeper.connect", ...);
consumerProperty.setProperty("group.id", "test2");

FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
producer1.ignoreFailuresAfterTransactionTimeout();
DataStreamSource<String> s1 = env.addSource(consumer1);
s1.addSink(producer1);
env.execute("Test");
}

最佳答案

除了为生产者设置exactly-once语义,你还需要配置消费者只读取kafka提交的消息。默认情况下,消费者将读取已提交和未提交的消息。将此设置添加到您的消费者应该会让您更接近您想要的行为。

consumerProperties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

关于apache-kafka - Flink Kafka Producer 中的 Exactly-once 语义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57308590/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com