gpt4 book ai didi

reactjs - 从kafka获取数据并发送到RSocket

转载 作者:行者123 更新时间:2023-12-04 08:22:37 26 4
gpt4 key购买 nike

我正在尝试从 kafka 获取消息并使用 Spring 将其发送到 RSocket。使用 React 在 Spring Java 和客户端发布服务器端

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

@Bean
public Function<Integer, Mono<Integer>> rsocketConsumer(RSocketRequester.Builder builder,
RsocketConsumerProperties rsocketConsumerProperties) {
RSocketRequester rSocketRequester = builder.websocket(URI.create("ws://localhost:7000/"));
return input -> rSocketRequester.route(rsocketConsumerProperties.getRoute()).data(input).retrieveMono(Integer.class);
}
}
@EnableBinding(Sink.class)
public class Listener {

@Autowired
private Function<Integer, Mono<Integer>> rsocketConsumer;


@StreamListener(Sink.INPUT)
public void fireAndForget(Integer val) {
System.out.println(val);
rsocketConsumer.apply(val).subscribe();
}
}
@Controller
public class ServerController {

@MessageMapping("data")
public Mono<Integer> hello(Integer integer) {
return Mono.just(integer);
}

}

我在服务器端做错了什么,因为我的客户端已连接但无法获取新消息
  client.connect().subscribe({
onComplete: socket => {
socket.fireAndForget({
data: { message: "hello from javascript!" },
metadata: null
});
},
onError: error => {
console.log("got error");
console.error(error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
console.log("subscribe!");
console.log(cancel);
// cancel.cancel();
}
});

最佳答案

你这样做requester.route("input").data("Welcome to Rsocket").send();我们有这个:

   /**
* Perform a {@link RSocket#fireAndForget fireAndForget} sending the
* provided data and metadata.
* @return a completion that indicates if the payload was sent
* successfully or not. Note, however that is a one-way send and there
* is no indication of whether or how the event was handled on the
* remote end.
*/
Mono<Void> send();
你看 - Mono ?这意味着必须订阅它才能启动响应式(Reactive)流处理。有关更多信息,请参阅项目 Reactor: https://projectreactor.io/
另一方面,在您的情况下,尚不清楚什么是服务器,什么是客户端...
你来做这件事:
    /**
* Build an {@link RSocketRequester} with an
* {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
* the given URL. The requester can be used to make requests
* concurrently. Requests are made over a shared connection that is also
* re-established as needed when further requests are made.
* @param uri the URL to connect to
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester websocket(URI uri);
我会说这意味着您显示的代码中的客户端。服务器位于 7000 的另一侧。为 ws:// 开放端口协议(protocol)。因此,请确保您正确理解和配置所有部件。例如,我不明白您为什么需要 @RestController在您的 Listener类(class)...

关于reactjs - 从kafka获取数据并发送到RSocket,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65423761/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com