- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Kafka Source 和 Sink 测试 Flink exactly-once 语义:
无论 TaskManager 是否终止和恢复,我都希望在输出主题中看到单调递增的整数。
但实际上在控制台消费者输出中看到了一些意想不到的东西:
32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45
看起来像在输出主题中重放的检查点之间的所有消息。它应该是正确的行为还是我做错了什么?
恢复了一个快照: Flink UI
我的 Flink 代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));
Properties producerProperty = new Properties();
producerProperty.setProperty("bootstrap.servers", ...);
producerProperty.setProperty("zookeeper.connect", ...);
producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
Properties consumerProperty = new Properties();
consumerProperty.setProperty("bootstrap.servers", ...);
consumerProperty.setProperty("zookeeper.connect", ...);
consumerProperty.setProperty("group.id", "test2");
FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
producer1.ignoreFailuresAfterTransactionTimeout();
DataStreamSource<String> s1 = env.addSource(consumer1);
s1.addSink(producer1);
env.execute("Test");
}
最佳答案
除了为生产者设置exactly-once语义,你还需要配置消费者只读取kafka提交的消息。默认情况下,消费者将读取已提交和未提交的消息。将此设置添加到您的消费者应该会让您更接近您想要的行为。
consumerProperties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
关于apache-kafka - Flink Kafka Producer 中的 Exactly-once 语义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57308590/
我想在数据中心选择一个事件分区。通常我会使用以下语句: INVANTIVE> use 1552839 2> Exclamation itgendhb077: Error in Invantive Da
我认为我的可能是 git 子模块的最简单用例。 我有一个目录结构 --- --- --- --- 每个子目录都是一个 git 存储库。我只想跟踪在我的 中添加的不同 git
我正在尝试循环数据框中的特定数字列,目标是使用“cor.test”函数提取相关性和 p 值。 相关性在于计算线性关系一个分类变量,由针对每个特定数字列的 0 和 1 值组成。 到目前为止,这是我的代码
当我使用 Invantive Data Hub 从多个 Exact Online 公司下载数据时,我得到了重复的行,而我希望每个公司只有一行。 我使用以下查询: select gla.code ,
我们刚刚上线 https://ecotaksen.be 。 Exact 上的查询和更新运行良好,但安装生产许可证后出现错误 itgenobr001:找不到客户端。。 我的数据容器规范是: 使用具有相
为了遵守法规,我尝试从我的一些部门下载采购发票文件(PDF 文件),将它们保存在磁盘上以供存档。 我使用 Invantive 查询工具来执行此操作。我想知道使用哪个表以及如何仅针对采购发票文档导出这些
我想获取“S-1”之后的链接,而不是“S-1/A”之后的链接。我尝试了“.find_all(lambda tag: tag.name == 'td' and tag.get()==['S-1'])”,
当我尝试通过 Google Colaboratory 中的 Earthengine 命令行上传 .tfrecord 和 .json 文件时,它显示“TfRecord 摄取 list 必须具有一个具有一
Closed. This question is off-topic 。它目前不接受答案。 想改善这个问题吗? Update the question 所以它是堆栈溢出的 on-topic。 10年前
这里给出了一个关于模板消歧器的问题: template disambiguator 在答案中我们可以读到: ISO C++03 14.2/4 When the name of a member tem
我想在考虑时间间隔的同时进行病例对照匹配。如果对照观察的自变量 X1、X2 和重叠时间间隔 X3 与一个案例具有相同的值,我想要一个匹配项。 例如,假设以下 df1: row Y X1 X2
我在这里有一个具有这种起始样式的 HTML 元素: transition: transform 2s; 首先是动画 (它旋转X)通过点击添加的类。下次单击时,将添加另一个类,该类添加了 transfo
我忘了,但是 EAGL 代表什么具体的东西吗?或者它只是核心动画 OpenGL 命名约定的一部分(CAEAGLLayer 等)? 最佳答案 “AGL”是苹果 OS X 的 OpenGL 扩展的名称。我
我们目前正在尝试优化复杂的 Angular 应用程序(性能和包大小)。 我们发现我们有部分未使用的组件,但我们不能 100% 确定它们。无论如何......我们目前要问的问题是,摇树在 Angular
我正在解决简单的优化问题。该数据集有 26 列和 3000 多行。 源代码看起来像 Means <- colMeans(Returns) Sigma <- cov(Returns) invSi
我让 Android Studio 将我的代码转换为 OnClickListener . 显然这里使用了 lambda。我不知道 lambda 是传递给 View 类的函数还是传递给 OnClickL
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 3 年前。 Improve th
关于“转换”的可用(类似)问题并没有真正阐明这是什么或做什么(顺便说一下,刚开始进行 Android 编程)。人们在哪里以及如何注意到“类型转换”的效果? 有什么区别: Button b = (But
我需要创建一个列,其中可以存储“0.0 - 99.99”之间的值。为什么?由于这种情况: 我的数据库中有这个表: "CREATE TABLE dumps( id INT
我正在摸不着头脑,经过一天的互联网搜索,我决定问你这个问题。 我有一个包含 2 个字段 tag_id 和 tag 的表 TAG,我试图将 TAG 的记录与特定字符串完全匹配,但我无法完全匹配,只能部分
我是一名优秀的程序员,十分优秀!