- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试读取请求的 kafka 消息数。对于非事务性消息,我们将从 endoffset - N(对于 M 个分区)开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息。对于幂等/事务消息,我们必须考虑事务标记/重复消息,这意味着偏移量将不连续,在这种情况下,endoffset - N 将不会返回 N 条消息,我们需要返回并寻找更多消息,直到我们有 N 条消息对于每个分区或达到开始偏移
由于有多个分区,我需要跟踪所有读取的偏移量,以便在所有操作完成后可以停止。有两个步骤,第一步计算起始偏移量(结束偏移量 - 请求的消息数)和结束偏移量。 (偏移量不连续,有间隙),我会寻找分区从起始偏移量开始消耗。第二步是轮询消息并对每个分区中的消息进行计数,如果我们不满足请求的消息数量,则再次重复第一步和第二步,直到满足每个分区的消息数量。
条件
初始轮询可能不会返回任何记录,因此请继续轮询。当达到每个分区的结束偏移量或轮询不返回结果时停止轮询。检查每个分区读取的消息是否与请求的消息相同。如果是,则标记为完成,如果否,则标记为继续并重复步骤。考虑消息中的间隙。应该适用于事务性和非事务性生产者。
问题:
我将如何跟踪每个分区已读取的所有消息并跳出循环?如果有帮助,每个分区中的消息将按顺序出现。
spring kafka支持这样的用例吗?更多详情可查看here
更新:我要求读取每个分区中的最后 N 条消息。分区和消息数是用户输入的。我想将所有偏移管理保留在内存中。本质上,我们试图按 LIFO 顺序读取消息。这使得它变得棘手,因为卡夫卡允许你向前阅读而不是向后阅读。
最佳答案
为什么有这样的需要,我不明白。当队列中没有任何内容时,Kafka 本身会进行管理。如果消息从一个状态跳转到另一个状态,则可以有单独的队列/主题。不过,这里是如何做到这一点的。
当我们使用类似 - 的方式消费来自分区的消息时 -
ConsumerIterator<byte[], byte[]> it = something; //initialize consumer
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String kafkaMessage = new String(messageAndMetadata.message());
int partition = messageAndMetadata.partition();
long offset = messageAndMetadata.offset();
boolean processed = false;
do{
long maxOffset = something; //fetch from db
//if offset<maxOffset, then process messages and manual commit
//else busy wait or something more useful
}while(processed);
}
我们获取有关偏移量、分区号和消息本身的信息。您可以选择使用此信息执行任何操作。
对于您的用例,您可能还决定将消耗的偏移量保存到数据库中,以便下次可以调整偏移量。另外,我建议关闭连接进行清理,并最终将处理后的偏移量保存到数据库。
关于java - Spring Kafka - 为任何主题的分区消耗最后 N 条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58339639/
这个问题在这里已经有了答案: “return” and “try-catch-finally” block evaluation in scala (2 个回答) 7年前关闭。 为什么method1返
我有一个动态列表,需要选择最后一项之前的项目。 drag your favorites here var lastLiId = $(".album
我想为每个线程执行特定操作,因此,我认为tearDown Thread Group 不起作用。 是否有任何替代方法可以仅在线程的最后一次迭代时运行“仅一次 Controller ”? 谢谢。 最佳答案
在我的书中它使用了这样的东西: for($ARGV[0]) { Expression && do { print "..."; last; }; ... } for 循环不完整吗?另外,do 的意义何
我想为每个线程执行特定操作,因此,我认为tearDown Thread Group 不起作用。 是否有任何替代方法可以仅在线程的最后一次迭代时运行“仅一次 Controller ”? 谢谢。 最佳答案
有没有可能 finally 不会被调用但应用程序仍在运行? 我在那里释放信号量 finally { _semParallelUpdates.Re
我收藏了 对齐的元素,以便它们形成两列。使用 nth-last-child 的组合和 nth-child(even) - 或任何其他选择器 - 是否可以将样式应用于以下两者之一:a)最后两个(假设
我正在阅读 Jon Skeet 的 C# in Depth . 在第 156 页,他有一个示例, list 5.13“使用多个委托(delegate)捕获多个变量实例化”。 List list = n
我在 AM4:AM1000 范围内有一个数据列表(从上到下有间隙),它总是被添加到其中,我想在其中查找和总结最后 4 个结果。但我只想找到与单独列相对应的结果,范围 AL4:AL1000 等于单元格
我最近编写了一个运行良好的 PowerShell 脚本 - 然而,我现在想升级该脚本并添加一些错误检查/处理 - 但我似乎被第一个障碍难住了。为什么下面的代码不起作用? try { Remove-
这个问题在这里已经有了答案: Why does "a == x or y or z" always evaluate to True? How can I compare "a" to all of
使用 Django 中这样的模型,如何检索 30 天的条目并计算当天添加的条目数。 class Entry(models.Model): ... entered = models.Da
我有以下代码。 public static void main(String[] args) { // TODO Auto-generated method stub
这个问题在这里已经有了答案: Why does "a == x or y or z" always evaluate to True? How can I compare "a" to all of
这个问题已经有答案了: Multiple returns: Which one sets the final return value? (7 个回答) 已关闭 8 年前。 我正在经历几个在工作面试中
$ cat n2.txt apn,date 3704-156,11/04/2019 3704-156,11/22/2019 5515-004,10/23/2019 3732-231,10/07/201
我可以在 C/C++ 中设置/禁用普通数组最后几个元素的读(或写)访问权限吗?由于我无法使用其他进程的内存,我怀疑这是可能的,但如何实现呢?我用谷歌搜索但找不到。 如果可以,怎样做? 因为我想尝试这样
我想使用在这里找到的虚拟键盘组件 http://www.codeproject.com/KB/miscctrl/touchscreenkeyboard.aspx就像 Windows 中的屏幕键盘 (O
我正在运行一个 while 循环来获取每个对话的最新消息,但是我收到了错误 [18-Feb-2012 21:14:59] PHP Warning: mysql_fetch_array(): supp
这个问题在这里已经有了答案: How to get the last day of the month? (44 个答案) 关闭 8 年前。 这是我在这里的第一篇文章,所以如果我做错了请告诉我...
我是一名优秀的程序员,十分优秀!