- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有几个关于 spring-kafka 在某些情况下的行为的问题。任何答案或指示都会很棒。
背景:我正在构建一个与外部 api 对话并发回确认的 kafka 消费者。我的配置如下所示:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.configuration.getString("kafka-generic.consumer.group.id"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000000");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "6000000");
return props;
}
@Bean
public RetryTemplate retryTemplate() {
final ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();
backOffPolicy.setInitialInterval(this.configuration.getLong("retry-exp-backoff-init-interval"));
final SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(this.configuration.getInt("retry-max-attempts"));
final RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> retryKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(this.configuration.getInt("kafka-concurrency"));
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setIdleEventInterval(this.configuration.getLong("kafka-rtm-idle-time"));
//factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(kafkaConsumerErrorHandler);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
假设我有 4 个分区。我的 KafkaListener 分区分布是:
@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"0", "1"}),
containerFactory = "retryKafkaListenerContainerFactory")
public void receive(Event event, Acknowledgment acknowledgment) throws Exception {
serviceInvoker.callService(event);
acknowledgment.acknowledge();
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"2", "3"}),
containerFactory = "retryKafkaListenerContainerFactory")
public void receive1(Event event, Acknowledgment acknowledgment) throws Exception {
serviceInvoker.callService(event);
acknowledgment.acknowledge();
}
现在我的问题是:
假设我有 2 台机器部署了这段代码(具有相同的消费者组 ID)。如果我理解正确,如果我得到一个分区事件,其中一台机器的相应分区的 kafkalistener 将会监听,但其他机器的 kafkalistener 不会监听这个事件。是吗?
我的错误处理程序是:
@Named
public class KafkaConsumerErrorHandler implements ErrorHandler {
@Inject
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
System.out.println("Shutting down all the containers");
kafkaListenerEndpointRegistry.stop();
}
}
让我们谈谈一个场景,其中消费者的 kafkalistener 被调用,它调用 serviceInvoker.callService(event);
但服务已关闭,然后根据 retryKafkaListenerContainerFactory
,它重试 3 次然后失败,然后调用错误处理程序从而停止 kafkaListenerEndpointRegistry。这会关闭具有相同消费者组的所有其他消费者或机器,还是仅关闭此消费者或机器?
让我们谈谈 2 中的场景。是否有任何我们需要更改的配置,让 kafka 知道推迟那么长时间的确认?
我的 kafka 生产者每 10 分钟生成一次消息。我是否需要在我的消费者代码中的任何地方配置这 10 分钟,或者它是否与此无关?
在我的 KafkaListener 注释中,我对主题名称和分区进行了硬编码。我可以在运行时更改它吗?
非常感谢任何帮助。提前致谢。 :)
最佳答案
ackOnError=false
,因此不会提交偏移量。${...}
或 Spel 表达式 #{...}
来设置它们在应用程序初始化期间启动。关于java - ConcurrentKafkaListenerContainerFactory 的 Spring-Kafka 消费者组协调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44014490/
我是 kafka 的新手,我经历了 documentation但我什么也听不懂。有人可以解释一下何时使用 ConcurrentKafkaListenerContainerFactory 类吗?我使用了
我正在使用 Spring Kafka 2.2.7,我已经使用 kafkaListenerContainerFactory 配置了 @EnableKafka 并使用 @KafkaListener 来消费
我有几个关于 spring-kafka 在某些情况下的行为的问题。任何答案或指示都会很棒。 背景:我正在构建一个与外部 api 对话并发回确认的 kafka 消费者。我的配置如下所示: @Bean p
的默认值是多少?私有(private)整数并发; ,在 ConcurrentKafkaListenerContainerFactory.java 中? 卡夫卡版本:0.10.2.1 spring-ka
我正在致力于使用 Spring-Kafka 框架实现 Kafka Topics 消息的消费。我试图了解我为 Kafka Listener 创建的 ConcurrentKafkaListenerCont
我是一名优秀的程序员,十分优秀!