gpt4 book ai didi

java - 如何从 Http 集成流创建 Spring Reactor Flux?

转载 作者:行者123 更新时间:2023-11-30 10:27:51 24 4
gpt4 key购买 nike

我有一个与这个非常相似的问题How to create a Spring Reactor Flux from a ActiveMQ queue?

有一个区别是消息来自 Http 端点而不是 JMS 队列。问题是 Message Channel 由于某种原因没有被填充,或者它没有被 Flux.from() 拾取。日志条目显示 GenericMessage 是从 Http Integration 流创建的,有效负载作为路径变量,但没有排队/发布到 channel ?我尝试了 .channel(MessageChannels.queue()).channel(MessageChannels.publishSubscribe())没有任何区别,事件流是空的。这是代码:

@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/eventmessage/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}


@GetMapping(value="eventmessagechannel/{id}", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages(@PathVariable String id){
return Flux.from(httpReactiveSource())
.map(Message::getPayload);

}

更新 1:

构建.gradle

buildscript {
ext {
springBootVersion = '2.0.0.M2'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
}


dependencies {
compile('org.springframework.boot:spring-boot-starter-freemarker')
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.integration:spring-integration-http')
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')

}

更新2

@SpringBootApplication@RestController在一个文件中定义时有效,当@SpringBootApplication@时停止工作RestController 在单独的文件中。

测试应用程序.java

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class TestApp {
public static void main(String[] args) {
SpringApplication.run(TestApp.class, args);
}
}

测试 Controller .java

package com.example.controller;


import org.springframework.context.annotation.Bean;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.http.dsl.Http;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;



import org.springframework.web.bind.annotation.GetMapping;
import reactor.core.publisher.Flux;



@RestController
public class TestController {
@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.toReactivePublisher();
}

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages() {
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}

}

最佳答案

这很适合我:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

public static void main(String[] args) {
SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
}

@Bean
public Publisher<Message<String>> httpReactiveSource() {
return IntegrationFlows.
from(Http.inboundChannelAdapter("/message/{id}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.payloadExpression("#pathVariables.id")
)
.channel(MessageChannels.queue())
.toReactivePublisher();
}

@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> eventMessages() {
return Flux.from(httpReactiveSource())
.map(Message::getPayload);
}

}

我在 POM 中有这个依赖项:

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

我运行应用程序并有两个终端:

curl http://localhost:8080/events

听 SSE。

在第二个中我执行了这个:

curl -X POST http://localhost:8080/message/foo

curl -X POST http://localhost:8080/message/bar

curl -X POST http://localhost:8080/message/666

因此,第一个终端响应如下:

data:foo

data:bar

data:666

请注意,我们不需要 spring-boot-starter-webflux 依赖项。 SSE 的 Flux 与 Servlet 容器上的常规 MVC 配合得很好。

Spring Integration 也将很快支持 WebFlux:https://jira.spring.io/browse/INT-4300 .因此,您将能够在那里进行配置,例如:

   IntegrationFlows
.from(Http.inboundReactiveGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

并且完全只依赖 WebFlux,没有任何 Servlet 容器依赖性。

关于java - 如何从 Http 集成流创建 Spring Reactor Flux?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45076028/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com