- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我是第一次使用 Spring Kafka,我无法在我的消费者代码中使用 Acknowledgement.acknowledge() 方法进行手动提交,如此处所述 https://docs.spring.io/spring-kafka/reference/html/_reference.html#committing-offsets .我的是 spring-boot 应用程序。如果我没有使用手动提交过程,那么我的代码就可以正常工作。但是当我使用Acknowledgement.acknowledge() 对于手动提交,它显示与 bean 相关的错误。另外,如果我没有正确使用手动提交,请建议我正确的方法。
错误信息:
***************************
APPLICATION FAILED TO START
***************************
Description:
Field ack in Receiver required a bean of type 'org.springframework.kafka.support.Acknowledgment' that could not be found.
Action:
Consider defining a bean of type 'org.springframework.kafka.support.Acknowledgment' in your configuration.
我用谷歌搜索了这个错误,我发现我需要添加 @Component 但它已经存在于我的消费者代码中。
我的消费者代码如下所示:Receiver.java
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@Autowired
public Acknowledgment ack;
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "${kafka.topic.TestTopic}")
public void receive(ConsumerRecord<?, ?> consumerRecord){
System.out.println(consumerRecord.value());
latch.countDown();
ack.acknowledge();
}
}
我的生产者代码如下所示:Sender.java
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private KafkaTemplate<String, Map<String, Object>> kafkaTemplate;
public void send(Map<String, Object> map){
kafkaTemplate.send("TestTopic", map);
}
}
编辑 1:
我的新消费者代码如下所示:Receiver.java
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "${kafka.topic.TestTopic}", containerFactory = "kafkaManualAckListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack){
System.out.println(consumerRecord.value());
latch.countDown();
ack.acknowledge();
}
}
我也改变了我的配置类:
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public Map<String, Object> consumerConfigs() throws SendGridException {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/*@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
将 containerFactory = "kafkaManualAckListenerContainerFactory"添加到我的 receive() 方法后,出现以下错误。
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
最佳答案
对于那些仍在寻找与手动确认有关的这些错误的解决方案的人,您无需指定 containerFactory = "kafkaManualAckListenerContainerFactory",而是只需添加:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
在您返回工厂对象之前添加到您的接收器配置。
那么你还需要:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
在消费者配置 Prop 中。
所以最后你的监听器方法可以简单地看起来像:
@KafkaListener(topics = "${spring.kafka.topic}")
private void listen(@Payload String payload, Acknowledgment acknowledgment) {
//Whatever code you want to do with the payload
acknowledgement.acknowledge(); //or even pass the acknowledgment to a different method and acknowledge even later
}
关于java - 如何使用Spring Kafka的Acknowledgement.acknowledge()方法进行手动提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46325540/
根据 /spring-kafka/docs/2.4.4.RELEASE/,关于 Kafka 的新功能,旨在否定确认,现在由 Spring-Kafka 支持。 “…… 从 2.3 版开始,Acknowl
我正在实现一个基于 kafka 的应用程序,我想在其中手动确认收到的消息。架构迫使我在一个单独的线程中完成它。 问题是:在与消费者不同的线程中执行 Acknowledgement.acknowledg
目前我使用服务器接收 GCP Pub/Sub 消息,我想执行 Purchase.Subscriptions:acknowledge调用以确认在服务器收到通知时尚未确认的订阅。 这是场景... 用户购
[!] 您的 Podfile 需要安装插件 cocoapods-acknowledgements。请安装它并再次尝试安装。 最佳答案 将其放入终端 gem install cocoapods-ackn
将我们的 Jenkins 主安装更新到最新的 LTS 版本 2.46.3 后,其从属设备之一(Windows 7 计算机,32 位)无法与主设备连接。 我们收到的错误是: java -jar slav
我想同时使用来自多个队列的jms消息。所有消息在长时间运行处理后都应该进入数据库,我无权丢失它们。 问题:是否可以保存消息以供将来确认,并在处理另一条消息时调用 oldMessage.acknowle
我有 N 个 tombcat 服务器监听一个 Tibco EMS 队列。我必须发送 N 条消息,每条消息必须由特定服务器处理(消息 1 必须由 tomcat 服务器 1 处理,...,消息 N 必须由
我一直在尝试提出一种 Camel 路线,可以从 activemq 中读取数据并将其写入 Oracle AQ。 但是,当消息成功写入 Oracle-aq 时,我必须将成功的消息写入另一个 Active
我是第一次使用 Spring Kafka,我无法在我的消费者代码中使用 Acknowledgement.acknowledge() 方法进行手动提交,如此处所述 https://docs.spring
我知道这个问题已经有人问过,但我找不到好的答案。每当我点击我制作的按钮时,一切似乎都正常,但我创建了多个 channel 。它应该只做一个。然后我得到这个错误。 代码: else if (intera
JMS session 到底意味着什么? JMS session 可以或不可以“事务处理”是什么意思? JMS session 可以或不可以“自动确认”是什么意思? 最佳答案 您可以将 JMS ses
private val DATABASE:String = config.getString("db.dbname") private val SERVER:ServerAddress = {
我已经在 MongoDB 数据库中存储了一个 pdf,我正在按如下方式访问它: dbPDFReports = client['pdfReports'] 客户端是我的 MongoClient 如下: c
CocoaPods 会自动为我使用过的所有项目生成一个很好的确认列表,以便我可以将这些信息包含在我的应用程序 UI 中。 整个 Android/Gradle 情况是否有类似的情况? 更一般地说,如何自
我正在开发一个连接到 Spring-MVC 服务器的 Java 应用程序,使用 Spring-Security 进行身份验证/授权。登录部分有效,我在 Java 应用程序中得到了一个 JSESSION
当在具有 AUTO_ACKNOWLEDGE 模式的 session 中调用 Message.acknowledge() 时,TIBCO 会发生什么情况? 我的意思是这个调用在客户端被忽略了吗?还是服务
我正在构建一个应用程序并希望将 cocoapods 自动生成的确认 Markdown 文件加载到一个 NSString 中以显示在我的应用程序中。我认为它会像这样做一样简单: NSString *pa
我正在使用 boost asio 通过 TCP 执行文件传输。文件传输有效,但是当我决定通过链接 async_write (在服务器上)和 async_read_until (在客户端上)实现从服务器
我正致力于在我正在编写的使用 Intel VT-x 虚拟化的 VMM 中支持发布中断。启用发布中断的文档中指定的 VM 进入要求之一是“退出时确认中断”VM 退出控制必须设置为 1。 当我将此控件设置
在我们的应用程序中,发布者创建一条消息并将其发送到一个主题。 然后它需要等待,当所有主题的订阅者都确认消息时。 它不会出现,消息总线实现可以自动执行此操作。因此,我们倾向于让每个订阅者在完成后向客户发
我是一名优秀的程序员,十分优秀!