gpt4 book ai didi

apache-flink - 如何在 apache kafka 连接器中实现 exactly once 语义

转载 作者:行者123 更新时间:2023-12-05 02:58:31 28 4
gpt4 key购买 nike

我使用的是 flink 版本 1.8.0 。我的应用程序从 kafka 读取数据 -> 转换 -> 发布到 Kafka。为了避免在重启期间出现任何重复,我想使用具有 Exactly once 语义的 kafka 生产者,请在此处阅读:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-011-and-newer

我的kafka版本是1.1。

        return new FlinkKafkaProducer<String>( topic,  new KeyedSerializationSchema<String>() {


public byte[] serializeKey(String element) {
// TODO Auto-generated method stub
return element.getBytes();
}


public byte[] serializeValue(String element) {
// TODO Auto-generated method stub
return element.getBytes();
}


public String getTargetTopic(String element) {
// TODO Auto-generated method stub
return topic;
}
},prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);

检查点代码:

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointTimeout(15 * 1000 );
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.enableCheckpointing(5000 );

如果我在 kafka producer 中添加了 exactly once sematics,我的 flink consumer 不会读取任何新数据。

任何人都可以与 Exactly once Semantics 分享任何示例代码/应用程序吗?

请在这里找到完整的代码:

https://github.com/sris2/sample_flink_exactly_once

谢谢

最佳答案

Can any one please share any sample code/application with Exactly once Semantics ?

一个恰好一次的例子隐藏在 end-to-end test in flink 中.由于它使用了一些方便的功能,如果不检查整个 repo 可能很难理解。

If I add exactly once sematics in kafka producer , my flink consumer is not reading any new data. [...] Please find complete code here :

https://github.com/sris2/sample_flink_exactly_once

我检查了您的代码并发现了问题(必须修复整个设置/代码才能真正运行)。接收器实际上无法正确配置事务。如 Flink Kafka connector documentation 中所写,您需要将 Kafka 代理中的 transaction.timeout.ms 调整为最多 1 小时或将应用程序中的时间调整为 15 分钟:

    prop.setProperty("transaction.timeout.ms", "900000");

相应的摘录是:

Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. This property will not allow to set transaction timeouts for the producers larger than it’s value. FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.

关于apache-flink - 如何在 apache kafka 连接器中实现 exactly once 语义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58961258/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com