- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在 spring-boot 中实现一个端点,当调用该端点时,它将转储 kafka 主题中的所有消息(用于测试)。
我期望的行为是,当生产者写入“testTopic”主题,随后消费者进行轮询时,它应该读取刚刚生成的消息。
我观察到的行为是消费者无法消费生成的消息。此外,如果生产者生成了更多消息(例如 10-15 条),那么消费者将一次性转储所有消息。从这一点开始,如果生产者生产一条消息,那么消费者就会按预期消费。
直觉上我认为设置 FETCH_MIN_BYTES_CONFIG
可能与此有关 - 也许消费者正在等待写入足够的字节。但这已经设置为 1 字节(默认值),并且不能解释后续成功的单个消息读取。
接下来我想也许我在创建主题之前注册了消费者(通过太快地调用注册端点)。但我在注册消费者之前从 kafka-topics.sh 确认该主题存在。
我注意到,如果我启用偏移量自动提交,那么行为有时会符合预期,有时则不会。通过手动提交偏移量(下面的代码中未显示),如上所述,行为非常奇怪。
我还通过使用 kafka-console-consumer
确认生产者正在按预期工作。
还尝试将轮询超时增加到 1 秒,但没有成功。
// Consumer
@Component
public class TestConsumer{
private KafkaConsumer testConsumer = null;
public void registerConsumer(final String consumerId) {
if (consumer == null) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
testConsumer = new KafkaConsumer<String, String>(props);
testConsumer.subscribe(Collections.singletonList("testTopic"));
}
else{
logger.debug("Consumer already registered");
}
}
public Map<String, List<String>> consume() {
Map<String, List<String>> messages = new HashMap<>();
if (testConsumer == null){
logger.error("testConsumer was not instantiated");
return null;
}
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
List<String> buffer = new ArrayList<>();
for (ConsumerRecord<String, String> record: records){
logger.debug(String.format("Consuming %s", record.value()));
buffer.add(record.value());
}
messages.put("data", buffer);
return messages;
}
}
步骤顺序是:1. Spring Boot应用程序启动2. kafka主题已创建,我可以通过kafka控制台确认3.我注册生产者和消费者4. Producer生产,我可以通过kafka控制台确认这一点(不同的消费者组)5.消费者消费失败
我期望结果如下:
{
"data" : ["message1"]
}
我得到的是
{
"data" : []
}
知道为什么消费者在写入阈值数量的消息之前不消费记录吗?
EDIT_1:向消费者添加了 props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
属性,但没有效果。
最佳答案
当您手动调用此testConsumer.poll(Duration.ofMillis(100))
时。你需要不断地从主题中汲取灵感。就像在无限 while 循环中一样。例如:
while (true) {
Map records = consume();
logger.debug("received records: {}", records);
}
看看这个链接:Kafka consumer
关于java - Kafka Consumer仅在产生 'enough'数据后才读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56178403/
我已经开始研究 MassTransit 并正在编写将处理消息的类。当我从 Consumes 实现接口(interface)时我有四个选项:All , Selected , For和 Context .
我正在尝试找出消费者群体级别是否也有任何抵消。在 Kafka 中,Consumer Offset 是在 Consumer group 级别还是在该 consumer group 内的单个消费者? 最佳
我有一个我不理解的 java 编译器错误。看来消费者 和 Consumer(带有 T 扩展对象)在方法签名参数中不等效。请查看以下代码: import java.util.function.Consu
我在泛型方面遇到了一些麻烦,尽管找到了解决方法,但我不明白是什么阻止了我的代码编译。 我有一个显示 TreeTableView 的 JavaFX 项目:
C++11 标准定义了一个内存模型(1.7、1.10),其中包含内存排序,大致为“顺序一致”、“获取”、“消耗”、“释放”和“放松”。同样粗略地,一个程序只有在它是无种族的情况下才是正确的,如果所有
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在,在消费者方面,我有两种选择。 1。使用 KafkaConsumer - 下面是 kafkaConsumer 的代码
我有四个当前消费者在 Amazon AWS 上收听同一个队列。从队列中拉取消息时,有时会出现同一条消息被两个不同的消费者消费的情况。请看下面的日志: 18:01:46,515 [jmsContaine
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
我们有一个系统,我们希望将记录(例如联系人、客户、机会)从我们的系统推送到 SalesForce。 为此,我们使用了 ForceToolKit for .Net .我们成功地将联系人记录从我们的系统推
我怎样才能写一个方法来组合 Stream的 Consumers成单个 Consumer使用 Consumer.andThen(Consumer) ? 我的第一个版本是: Consumer combi
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我正在尝试在我的 scala play 应用程序中创建消费者 secret / key 对,但我似乎无法让它正常工作。我有以下代码 import org.apache.commons.codec.bi
我通过传递用户(消费者)名称使用 .NET 应用程序,我需要从 Salesforce 检索消费者 key 和消费者 key ,我该如何实现。 最佳答案 Consumer Key 和 Consumer
我想设置 至 0 .这似乎是另一个问题 ( JMS queue with multiple consumers ) 的答案,并在此 article 中进行了描述。在第 17.1.1 章中。我使用 JN
I have send message api to my users.When I send to message from my x numbers I need to wait 10-15
我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下- while (true) { ConsumerRecords records
我正在为 iPhone 编写 Twitter/Facebook 应用程序。我有自己的 Apache/PHP 服务器。我只想把Consumer Key放在app里,然后我把Consumer Secret
Spring AMQP:比较多个消费者与每个消费者多个线程的性能 我正处于从 Spring 文档学习 Spring AMQP 的阶段。我不清楚提高异步消息消费率的首选方法:根据 Spring 文档 (
我正在制作一个需要 oAuth 1.0 身份验证的应用程序。我可以访问客户提供的消费者 key 和消费者 secret 。我曾尝试使用 AFNetworking 进行此操作,但效果不佳。有人可以建议我
我是一名优秀的程序员,十分优秀!