- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的 Broker 中有一个名为“test”的主题。我用 CLI 检查过。
我创建了一个 java 生产者来将消息发送到主题 test
。我可以从 CLI 中使用它们。
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
(我在 Windows 上运行它)
但是,当我在 Java Consumer 程序中运行它时,即使我将 auto.offset.reset
设置为 earliest
,它也不会消耗任何消息。我究竟做错了什么?
public class Consumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "jin");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//consumer.subscribe(Collections.singletonList("test"));
consumer.subscribe(Arrays.asList("test"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
//ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
//consumer.commitAsync();
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
System.out.println("closed");
}
}
}
最佳答案
auto.offset.reset如果它是一个全新的消费者组,或者消费者组偏移被删除,则该属性将会出现。它不适用于已经在 Kafka 中存储了偏移量的消费者组
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
关于java - 即使使用 auto.offset.reset,我的 Java Consumer 也无法从 Broker 读取消息 - 最早,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57212222/
在为代理到 Zookeeper 身份验证实现 SSL 时,我在代理 2 节点上遇到了以下异常 从 SSL 设置中,集群配置了 SSL 身份验证 Broker 1 成功连接到 zookeeper,但在
我想读取 some_dir/activemq.xml 文件,以便可以使用 org.apache.activemq.broker.BrokerService 创建代理,该代理将具有所提供文件的所有配置。
我在 fi-lab 上创建了一个新的 Orion 实例,以便将它连接到我们管理的 CEP 实例。将存储在我的 Orion 实例上的数据必须由位于 orion.lab.fi-ware.eu:1026 的
我想使用IB Api,但无法弄清楚我们如何请求完整的符号列表和信息。 在我找到的文档中:reqScannerParameters()-但不清楚如何获取例如纳斯达克股票的 list ? 有没有更好的办法
我已经开始将 IB 与 IBridgePy 结合使用,我想知道是否有可能以某种方式执行任何回溯测试,有没有人如何做到这一点? 最佳答案 IB 没有现成的回测/重放工具。基本上,您必须下载历史数据并通过
我是Pact(消费者驱动的测试)的新手,并且使用gradle,我曾在这个著名的研讨会上尝试使用Java和Pact Brocker https://github.com/Mikuu/Pact-JVM-E
在我的IDE中,我能够利用一个spring-boot应用程序,该应用程序将生成消息(使用Kafkaproducer)给外部kafka代理。但是,一旦我将我的spring-boot应用程序托管在dock
我正在用 C# 移植 Java 应用程序,我需要构建嵌入式 ActiveMQ 实例。 在 Java 中,我可以使用 BrokerService 类,但我在 .Net 的 Apache.Nms 命名空间
我的本地计算机上有一个消息生成器,在远程主机 (aws) 上有一个代理。 生产者发送消息后,我等待并调用远程主机上的控制台消费者并查看过多的日志。没有来自生产者的值(value)。 生产者在调用s
我正在开发一个物联网项目,到目前为止一切顺利,我可以使用我在网络上找到的免费代理在客户端之间共享数据... 我的用例是关于温度传感器的,该传感器将消息发布并保留到主题房间/温度 由于消息被保留,cli
我读到的所有地方都说由服务代理处理的消息是按照它们到达的顺序处理的,但是如果你创建一个表、消息类型、契约(Contract)、服务等,并且在激活时有一个存储过程等待 2秒并将消息插入表中,将最大队列读
我有一个 SQL Server 2008 R2 实现,并为在同一个机器上运行的 .Net/IIS 网站打开了 Service Broker。 当 global.asax application_sta
在我的发布场景中,我们有多个部署者将内容推送到文件系统和数据库(代理)。页面和二进制文件放在文件系统上,其他所有内容都放在 Broker 中。我们有一位部署者将内容放入数据库。这是推荐的最佳做法吗?
我们将 MongoDB 用于我们的内部部署环境。 Orion 何时正式支持 AWS DocumentDB Orion 会在支持 DocumentDb 后放弃 MongoDB 还是在 future 同时
我已经在单台服务器上使Kafka(使用Docker镜像)工作,在同一台/一台Server X上安装了Zookeeper + Kafka Broker。 如果我需要在服务器Y上添加其他代理(以实现 fl
所以问题是,当我尝试与上下文代理建立连接时,我是否正在尝试更新实体或读取值。只有当我第二次询问时它才回复正常。 Context Broker 版本:0.24.0(我从 0.20.0 更新,因为我认为这
我在同一个实例上有两个数据库。 一个叫 ICMS,一个叫 CarePay_DEV1 当 ICMS (Source) 发生变化时,它需要向 CarePay_Dev1 (Destination) 发送消息
即使我指定 Receive Top(25) 等,我也一次只能收到一条消息出列。不确定我在 sproc 中做错了什么?可能是一些微不足道的事情,但我没有看到问题。 过程: CREATE PROCEDUR
我们在位置 A 有 Mosquito 代理,而 Rabbit MQ 在云中。我们已经连接了两个经纪人。来自 Mosquito 中配置的 Topic 的数据将在 Rabbit MQ 中的 AMQ.Top
我正在使用 Apache Kafka。我将巨大的数据库转储到 Kafka 中,其中每个数据库的表都是一个主题。 在主题完全消耗之前我无法删除它。我无法设置基于时间的保留策略,因为我不知道主题何时会被消
我是一名优秀的程序员,十分优秀!