- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
问题:如何在 flink 代码中查询 kafka 主题以获取特定消费者组的偏移量? (还有附带问题(如果需要,我会在这里提出一个新问题)。如果可能的话,我怎样才能获得该偏移量的时间戳?
(我发现有 cli 工具可以查询它,但这不是我想要的,因为它不是在 flink 作业中以编程方式完成的)
关于整个问题的一些额外背景,但我不想让它过于开放。
我有一个用例,其中数据将从 kafkaTopic1 流入程序(我们称之为 P1),进行处理,然后持久保存到数据库中。 P1 将在一个多节点集群上,因此每个节点将处理多个 kafka 分区(假设该主题有 5 个节点和 50 个 kafka 分区)。如果其中一个节点由于某种原因完全失败并且正在处理数据,那么该数据将会丢失。
比如kafkaTopic1上有500条消息,node2已经拉取了10条消息(那么根据offset下一条要拉取的消息是11条消息)但是当节点失败,仍在处理的 2 将丢失。当节点重新启动时,它将开始读取消息 11,跳过两条丢失的消息(从技术上讲,kafka 分区将开始将其消息发送到另一个节点进行处理,因此该分区的偏移量将移动,我们不会必须确切地知道节点死亡时下一个要处理的消息)。
(注意:当节点死亡时,假设用户注意到并完全关闭 P1,因此此时不会处理更多数据,暂时)。
这就是 flink 发挥作用的地方。我想做一个 flink 作业,可以通过用户的参数告诉 P1 的消费者组,然后查询 kafka 主题(也由用户提供)以获取当前偏移量(OS1)。然后,flink 作业将其 kafkaTopic1 的偏移量设置为 OS1 之前的 X 时间量(X 由用户通过 args 提供)并开始从 kafka 主题读取消息。然后它将读取的每条消息与数据库中的消息进行比较,如果在数据库中没有找到该消息,它会将其发送到另一个 kafka 主题 (kafkaTopic2),以便在 P1 重新启动时进行处理。
最佳答案
如果在 flink 作业中启用了检查点,那么您不应该丢失消息,因为 flink 也在内部维护偏移量,并且在从故障中恢复后,它应该从 flink 上次提交的偏移量读取。
现在,如果您仍然想找到偏移量并重新从偏移量读取,这会变得很棘手,因为您需要找到给定消费者组给定主题的所有分区的偏移量。
我不知道如何从开箱即用的 Flink-kafka-Consumer API 做到这一点,但您可以将 kafka 依赖项添加到您的项目并从 Kafka API 创建一个 kafkaconsumer。一旦有了消费者,就可以调用
consumer.position(partition)
或
consumer.committed(partition)
Mind that you still need to loop over all the partitions to get all the current offsets
在此处了解差异:Kafka Javadoc
一旦有了要读取的偏移量,就可以使用类似以下内容在 flink 作业中手动指定消费者偏移量:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
有关 Flink-kafka-Consumer 的更多信息,请查看 Flink Kafka Connector
关于java - Flink - 查询Kafka主题以获取消费者组的偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45718240/
我需要您在以下方面提供帮助。近一个月来,我一直在阅读有关任务和异步的内容。 我想尝试在一个简单的 wep api 项目中实现我新获得的知识。我有以下方法,并且它们都按预期工作: public Htt
我的可执行 jar 中有一个模板文件 (.xls)。不需要在运行时我需要为这个文件创建 100 多个副本(稍后将唯一地附加)。用于获取 jar 文件中的资源 (template.xls)。我正在使用
我在查看网站的模型代码时对原型(prototype)有疑问。我知道这对 Javascript 中的继承很有用。 在这个例子中... define([], function () { "use
影响我性能的前三项操作是: 获取滚动条 获取偏移高度 Ext.getStyle 为了解释我的应用程序中发生了什么:我有一个网格,其中有一列在每个单元格中呈现网格。当我几乎对网格的内容做任何事情时,它运
我正在使用以下函数来获取 URL 参数。 function gup(name, url) { name = name.replace(/[\[]/, '\\\[').replace(/[\]]/,
我最近一直在使用 sysctl 来做很多事情,现在我使用 HW_MACHINE_ARCH 变量。我正在使用以下代码。请注意,当我尝试获取其他变量 HW_MACHINE 时,此代码可以完美运行。我还认为
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 关闭 9 年前。 要求提供代码的问题必须表现出对所解决问题的最低限度的理解。包括尝试过的解决方案、为什么
由于使用 main-bower-files 作为使用 Gulp 的编译任务的一部分,我无法使用 node_modules 中的 webpack 来require 模块code> dir 因为我会弄乱当
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 5 年前。 Improve this qu
我使用 Gridlayout 在一行中放置 4 个元素。首先,我有一个 JPanel,一切正常。对于行数变大并且我必须能够向下滚动的情况,我对其进行了一些更改。现在我的 JPanel 上添加了一个 J
由于以下原因,我想将 VolumeId 的值保存在变量中: #!/usr/bin/env python import boto3 import json import argparse import
我正在将 MSAL 版本 1.x 更新为 MSAL-browser 的 Angular 。所以我正在尝试从版本 1.x 迁移到 2.X.I 能够成功替换代码并且工作正常。但是我遇到了 acquireT
我知道有很多关于此的问题,例如 Getting daily averages with pandas和 How get monthly mean in pandas using groupby但我遇到
This is the query string that I am receiving in URL. Output url: /demo/analysis/test?startDate=Sat+
我正在尝试使用 javascript 中的以下代码访问 Geoserver 层 var gkvrtWmsSource =new ol.source.ImageWMS({ u
API 需要一个包含授权代码的 header 。这就是我到目前为止所拥有的: var fullUrl = 'https://api.ecobee.com/1/thermostat?json=\{"s
如何获取文件中的最后一个字符,如果是某个字符,则删除它而不将整个文件加载到内存中? 这就是我目前所拥有的。 using (var fileStream = new FileStream("file.t
我是这个社区的新手,想出了我的第一个问题。 我正在使用 JSP,我成功地创建了 JSP-Sites,它正在使用jsp:setParameter 和 jsp:getParameter 具有单个字符串。
在回答 StoreStore reordering happens when compiling C++ for x86 @Peter Cordes 写过 For Acquire/Release se
我有一个函数,我们将其命名为 X1,它返回变量 Y。该函数在操作 .on("focusout", X1) 中使用。如何获取变量Y?执行.on后X1的结果? 最佳答案 您可以更改 Y 的范围以使其位于函
我是一名优秀的程序员,十分优秀!