gpt4 book ai didi

redis - 使用 Spring & Lettuce 接收 Redis 流数据

转载 作者:行者123 更新时间:2023-12-02 06:59:55 26 4
gpt4 key购买 nike

我有以下 Spring boot 代码,用于在 Redis 流附加新记录时接收值。问题是接收者永远不会收到任何消息,而且订阅者在使用 subscriber.isActive() 检查时始终处于非事件状态。这段代码有什么问题吗?我错过了什么? Doc以供引用。

在 Spring Boot 启动时,初始化必要的 Redis 资源

生菜连接工厂

@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory("127.0.0.1", 6379);
}

来自连接工厂的RedisTemplate

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}

Rest Controller 将数据附加到 Redis 流

@PutMapping("/{name}")
public String post(@PathVariable String name) {
return redisTemplate.opsForStream().add(StreamRecords.newRecord().in("streamx").ofObject(name)).getValue();
}

JMS 风格命令式消息监听器

@Component
public class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("message received: " + message.getValue());
}

}

初始化监听器

  @Bean
public Subscription listener(MyStreamListener streamListener, RedisConnectionFactory redisConnectionFactory) throws InterruptedException {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(redisConnectionFactory);
Subscription subscription = container.receive(Consumer.from("my-group-1", "consumer-1"),
StreamOffset.create("streamx", ReadOffset.latest())), streamListener);
System.out.println(subscription.isActive()); // always false
return subscription;
}

不过,我可以通过 api 附加到流。

最佳答案

重要的一步是,订阅完成后启动StreamMessageListenerContainer

container.start();

关于redis - 使用 Spring & Lettuce 接收 Redis 流数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58029906/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com