gpt4 book ai didi

spring-integration:如何从 WebFlux 集成流创建 Spring Reactor Flux?

转载 作者:行者123 更新时间:2023-12-05 05:16:38 24 4
gpt4 key购买 nike

How to create a Spring Reactor Flux from Http integration flow? artem-bilan mentioned in a comment将来可以使用 webflux 集成。

自撰写评论以来,WebFlux 集成一直是 factored out to spring-integration-webflux .我尝试了以下方法,通过将 MVC 版本的 Http.inboundChannelAdapter@GetRequest 处理程序替换为WebFlux.inboundChannelAdapterWebFlux.inboundGateway:

@SpringBootApplication
public class WebfluxApplication {

public static void main(String[] args) {
SpringApplication.run(WebfluxApplication.class, args);
}


@Bean
public Publisher<Message<String>> reactiveSource() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.log()
.channel(MessageChannels.flux())
.toReactivePublisher();
}


@Bean
public IntegrationFlow eventMessages() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> reactiveSource())
.get();
}

似乎 reactiveSource() 发布者中的流没有收到任何消息,至少没有为我的 .log() 语句记录任何内容。

当我替换 eventMessages 流程中的 reactiveSource() 发布者时

.handle((p, h) -> reactiveSource()) 

由假出版商

.handle((p, h) -> Flux.just("foo", "bar"))

我收到 SSE 回复

curl localhost:8080/events

跟踪日志显示 reactiveSource() POST 处理程序已映射并且正在调用 WebFluxInboundEndpoint.handle 方法:

2018-05-05 16:50:58.788  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/message/{id}],methods=[POST]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:58.789 INFO 6552 --- [ main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/events],methods=[GET || POST],produces=[text/event-stream]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:59.191 INFO 6552 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-05-05 16:50:59.192 INFO 6552 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2018-05-05 16:50:59.196 INFO 6552 --- [ main] d.e.sample.webflux.WebfluxApplication : Started WebfluxApplication in 2.608 seconds (JVM running for 3.419)
2018-05-05 16:51:06.918 DEBUG 6552 --- [ctor-http-nio-2] o.s.web.reactive.DispatcherHandler : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:06.932 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:06.933 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@775cdb20]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:06.967 DEBUG 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod : Response fully handled in controller method
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] o.s.web.reactive.DispatcherHandler : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@71f648a3]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:11.364 DEBUG 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod : Response fully handled in controller method

这是为什么?

最佳答案

原因似乎是 WebFluxInboundEndpoint 在 doHandle() 中停止处理没有正文的 POST 请求,行

.map(body -> new HttpEntity<>(...)) 

如果没有请求体内容则永远不会执行:

private Mono<Void> doHandle(ServerWebExchange exchange) {
return extractRequestBody(exchange)
.doOnSubscribe(s -> this.activeCount.incrementAndGet())
.map(body -> new HttpEntity<>(body, exchange.getRequest().getHeaders()))
.map(entity -> buildMessage(entity, exchange))
.flatMap(requestMessage -> {
if (this.expectReply) {
return sendAndReceiveMessageReactive(requestMessage)
.flatMap(replyMessage -> populateResponse(exchange, replyMessage));
}
else {
send(requestMessage);
return setStatusCode(exchange);
}
})
.doOnTerminate(this.activeCount::decrementAndGet);

}

解决方法:调用者必须发送任何非空的请求主体才能使其工作,例如用 -d 传递的单引号就足够了:

curl -d ' http://localhost:8080/message/4

有了这样的请求,我的日志包含预期的传入 GenericMessage 并且/events 资源开始生成 SSE。

2018-05-05 17:25:24.777 TRACE 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 17:25:24.777 DEBUG 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod : Response fully handled in controller method
2018-05-05 17:25:24.778 INFO 40436 --- [ctor-http-nio-8] o.s.integration.handler.LoggingHandler : GenericMessage [payload=4, headers={http_requestMethod=POST, Accept=*/*, User-Agent=curl/7.49.1, http_requestUrl=http://localhost:8080/message/4, Host=localhost:8080, id=9a09294d-280a-af3b-0894-23597cf1cb5f, Content-Length=1, contentType=application/x-www-form-urlencoded, timestamp=1525533924778}]

关于spring-integration:如何从 WebFlux 集成流创建 Spring Reactor Flux?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50188401/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com