gpt4 book ai didi

java - RSocket 适用于生成的数据,但不适用于 Spring Reactive MongoDB

转载 作者:可可西里 更新时间:2023-11-01 09:38:11 29 4
gpt4 key购买 nike

解决摘要:

在目前的大多数 RSocket 示例中,即使在 SpringBoot 相关教程中,服务器端接受器也被简单地构造为一个新对象(如下面的 new MqttMessageService() )。如果您在接受器类中生成示例内容,这很好,但当接受器依赖于容器中的其他 bean 时,可能会导致以下依赖注入(inject)相关的混淆。

原始问题:

尝试通过 Rsocket 的 Java 服务器使用 Spring Data Reactive Mongodb 存储库流式传输数据库条目时,我收到 NullPointerException。

问题是在调试过程中所有组件单独工作:我可以通过同一个 Mongodb 存储库获取请求的数据,我还可以使用 Rsocket 在同一个服务器和客户端之间流式传输随机生成的数据。

所以我要么遗漏了一些非常基本的东西,要么一起使用 Reactive Mongodb 和 Rsocket 可能会出现问题。

这是原始的服务器端 Rsocket 配置:

@Configuration
public class RsocketConfig {

@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}

下面是具有适当 DI 的工作服务器端 Rsocket 配置:

@Configuration
public class RsocketConfig {

@Autowired
MqttMessageService messageService;

@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}

这是服务器端 AbstractRSocket 实现,其中在返回 service.findAll() 时抛出 NullPointerException。

@Service
public class MqttMessageService extends AbstractRSocket {



@Autowired
private MqttMessageEntityService service;

@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));

}
}

这里是响应式存储库和相关服务。该服务在注入(inject)到服务器的 AbstractRSocket 实现时返回 null,但在注入(inject)到其他类时工作正常:

@Service
public class MqttMessageEntityService {

@Autowired
private MqttMessageEntityRepository repository;

public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}

}

public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {

}

下面是与测试内容完美配合的客户端代码:

@Configuration
public class RsocketConfig {

@PostConstruct
public void testRsocket() {

RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();

rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}

我在这里的知识水平可能有点高,而且关于该主题的资源非常有限,所以我很感激对解决方案的任何提示:)

最佳答案

关于

@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose();
}

您是否使用 来保持服务器 Activity ?如果是这样,在 onClose() 之后添加另一个 block 。

messageEntityService 是否为空?因为如果变量 topicStart 和 module 不是,那看起来是唯一可能导致错误的东西。特别是如果其他代码有效 - 我真的看不到任何会导致 RSocket 端出现问题的东西。

关于java - RSocket 适用于生成的数据,但不适用于 Spring Reactive MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53993152/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com