- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我已经建立了一个实验性的 Kafka 环境,其中包含 3 个代理和一个包含 3 个分区的主题。我有一个生产者和一个消费者。我想为特定消费者修改分区的偏移量。我在 kafka 文档中读到,kafka 中的消费者提交/获取 API 可以提交特定的偏移量或获取消费者读取的最新偏移量。这是 API 的链接:
我已经使用下面页面中的代码来编写我的代码,以便从特定的消费者那里获取偏移量。但是,获取 API 为请求的偏移量返回值“-1”。这是示例代码:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
我还在第一个链接中读到“如果没有与该消费者组下的主题分区关联的偏移量,则代理不会设置错误代码(因为它不是真正的错误),但会返回清空元数据并将偏移字段设置为 -1。”
但是我已经生成了一些消息,我的消费者已经使用了这些消息并输出了每条读取消息的偏移量。
如果有人能提供帮助,我将不胜感激。我想知道我的代码的哪一部分是错误的。或者 API 可能有问题。请不要犹豫,提出任何有用的意见。我的代码与我提供的链接中的代码完全一样。但是,如果您需要查看我的代码,请告诉我将其放在此处。
kafka版本为0.10.2.0
我的 Kafka 的配置是:
代理 1:端口 9093
经纪人 2:端口 9094
代理 3:端口 9095
主题:“testpic3”
......................................
消费者配置:
props.put("group.id", "test");
props.put("client.id", "MyConsumer");
................
这是我的代码:
public class KafkaOffsetManage {
public static void main(String[] args) {
BlockingChannel channel = new BlockingChannel("localhost", 9095,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "test";
final String MY_CLIENTID = "MyConsumer";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
System.out.println("+++++++++++++++++++++++++++");
System.out.println(metadataResponse.errorCode());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
System.out.println("Connected to Offset Manager");
System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port());
} else {
// retry (after backoff)
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
//partitions.add(testPartition1);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
System.out.println(retrievedMetadata);
System.out.println(result.toString());
}
}
catch (Exception e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
}
}
代码的输出在这里:
+++++++++++++++++++++++++++
0
Connected to Offset Manager
user-virtual-machine, Port:9093
------------------------
The retrieved offset is:-1
OffsetMetadataAndError[-1,,3]
Process finished with exit code 0
关于 Kafka 依赖项有一件奇怪的事情。当我添加此依赖项时,我的代码无法识别程序中的某些类:
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
无法识别类“ConsumerMetadataRequest”和“ConsumerMetadataResponse”。
所以我添加了这个依赖:
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
谢谢,
最佳答案
这种情况的发生是因为抵消过期。 kafka 中有两个参数控制这种行为。首先是“__consumer_offsets”主题的“retention.ms”设置。它应该等于 -1 以禁用该主题内的记录过期。我假设使用 kafka 版本 1.1.x。使用命令检查主题配置:
$ ./kafka-configs.sh --entity-type topics \
--entity-name __consumer_offsets \
--zookeeper localhost:2181 \
--describe
Configs for topic '__consumer_offsets' are compression.type=producer,cleanup.policy=compact,min.insync.replicas=2,segment.bytes=104857600,retention.ms=-1,unclean.leader.election.enable=false
如果不符合配置设置,请使用命令更改它们:
$ ./kafka-configs.sh --entity-type topics \
--entity-name __consumer_offsets \
--zookeeper localhost:2181 \
--alter \
--add-config retention.ms=-1
假设设置了retention policy,接下来需要检查topic中是否有commited message。默认情况下,kafka 不允许读取内部主题。要更改此行为,请创建一个包含消费者设置的文件:
$ echo exclude.internal.topics=false > consumer.properties
之后使用命令阅读“__consumer_offsets”主题:
$ ./kafka-console-consumer.sh --consumer.config consumer.properties \
--from-beginning \
--topic __consumer_offsets \
--zookeeper localhost:2181 \
--formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
如果主题中有内容,输出将如下所示:
[test_client,Test.Purposes,2]::[OffsetMetadata[13,NO_METADATA],CommitTime 1534165245681,ExpirationTime 1534251645681]
[test_client,Test.Purposes,0]::[OffsetMetadata[14,NO_METADATA],CommitTime 1534165245776,ExpirationTime 1534251645776]
[test_client,Test.Purposes,1]::[OffsetMetadata[8,NO_METADATA],CommitTime 1534165690946,ExpirationTime 1534252090946]
这里的 ExpirationTime 值是有意义的。 Group Coordinator 将只读取偏移量加载时未过期的记录,即 now() < ExpirationTime,并将这些值返回给客户端的偏移量获取请求。
ExpirationTime 是在客户端使用公式提交偏移量时计算的:
ExpirationTime = CommitTime + offsets.retention.minutes
offsets.retention.minutes 是经纪人级别的设置,默认情况下它等于 1440(24 小时)。从命令输出中解码 CommitTime 和 ExpirationTime,我们看到
$ date -d @1534165245
Mon Aug 13 16:00:45 UTC 2018
$ date -d @1534251645
Tue Aug 14 16:00:45 UTC 2018
正好是 24 小时。
所以不正确的偏移量问题的解决方案是增加“offsets.retention.minutes”设置记住,当系统中有很多死消费者组时,这会影响代理内存使用,并且还会定期提交不变的偏移量以增加到期时间。
关于apache-kafka - kafka consumer fetch API 不返回正确的偏移值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43290387/
我正在尝试从第 4 到 9 页以及第 12 和 13 页上的单元格中清除所有内容(包括图像)。我有以下代码,但它正在清除第 3-9 和 12-15 页中的内容,我不知道为什么。 有什么想法吗? Sub
有没有办法增加极坐标图刻度标签(θ)的填充/偏移? import matplotlib import numpy as np from matplotlib.pyplot import figure,
我正在调用本地 API 并尝试以分页 样式进行操作。我有 n 张图片,我想将它们分成 n/4 行(每行 4 张图片)。因此,我正在调用我的 API,images/count,offset。但不知何故,
我的问题解释起来有点棘手,但无论如何我都会尝试。我有两个水平选项卡,当您单击它们时,会打开一个文本框内容。当他们被点击时,我试图“关注”他们。我在网上找到了很多资料,但除了我在下面显示的这段代码外,没
所以我有一个 float 的 div,我需要它始终向右 200 像素,并填充窗口的其余部分。有没有某种跨浏览器兼容的方法,我可以在不借助 javascript 的情况下使宽度填满页面的其余部分? 最佳
我有以下片段 $('html,body').animate({scrollTop: $('#menu').offset().top}, 'slow'); 单击链接时,我希望浏览器从#menu div
我目前正在为我的应用程序使用 JASidePanel,并且我有一个 UITableViewcontroller 和一个 UIRefreshControl 作为它的 ViewController 之一。
给出以下代码: imshow(np.arange(16*16).reshape(16,16)) cb = colorbar() cb.set_label("Foo") cb.set_ticks([0,
我是编程新手,我认为 VBA 是一个很好的起点,因为我在 Excel 中做了很多工作。 我创建了一个宏,它从输入框中获取一个整数(我一直使用 2、3 和 4 来测试),并创建该数字的一组 4 层层次结
我在 PHP 中有一个 unix 时间戳: $timestamp = 1346300336; 然后我有一个我想要应用的时区的偏移量。基本上,我想应用偏移量并返回一个新的 unix 时间戳。偏移量遵循这
演示:http://jsfiddle.net/H45uY/6/ 我在这里想做的是将 的左上角设为跟随鼠标。代码在没有段落的情况下工作正常(请参阅上面的演示),但是当您添加段落时,被向上推,鼠标位于盒
假设我们有两个由无符号长(64 位)数组表示的位图。我想使用特定的移位(偏移)合并这两个位图。例如,将位图 1(较大)合并到位图 2(较小)中,起始偏移量为 3。偏移量 3 表示位图 1 的第 3 位
通过在 pageViewController 中实现 tableView,tableView 与其显示的内容不一致。对此最好的解决办法是什么? 最佳答案 如果您的 TableView 是 View C
我设置了一个在 nib 中显示地点信息的地点配置文件。当我在标准屏幕流程中推送此 View 时,它工作正常。但是,当我从另一个选项卡推送此 View 时,UINavigationBar 似乎抵消了它,
如果我想选择 5 条记录,我会这样做: SELECT * FROM mytable LIMIT 5 如果我想添加偏移量,我会这样做: SELECT * FROM mytable OFFSET 5 LI
我有一个应用程序,其中某些 View 需要全屏,而其他 View 不需要全屏。在某些情况下,我希望背景显示在状态栏下方,所以我在 View 加载时使用它来使 Activity 全屏显示: window
在下图中,我进行绘制,结果位于 A 点,就在我手指接触的地方。 如何使图像显示在实际触摸上方约 40pt。 (二) 我正在使用经典的 coreGraphic UITouch 代码,如下所示: - (v
只要键盘处于事件状态,我就会尝试偏移 UITextField,效果很好,直到我尝试了表情符号布局。有没有办法检测键盘输入的类型,以便找出高度差?谢谢 最佳答案 不是使用 UIKeyboardDidSh
这是我的 Swift 代码 (AppDelegate.swift): var window: UIWindow? var rootViewController :UIViewController? f
我有一个 div 作为绝对定位的 body 的直接子节点,其 css 属性定义如下: div[id^="Container"] { display: block; position: a
我是一名优秀的程序员,十分优秀!