gpt4 book ai didi

java - 如何配置 Spring Cloud Stream 中函数的绑定(bind),以将其输入绑定(bind)到 Web 端点并将其输出绑定(bind)到 Kafka 主题

转载 作者:行者123 更新时间:2023-12-01 23:35:16 26 4
gpt4 key购买 nike

我有一个常规的 java Function;我正在尝试绑定(bind):

  1. 其对网络端点的输入
  2. 其输出到 kafka 主题。

当我在 Web 上下文中使用函数时,它总是将 Function 的结果值单独返回给 Web 客户端。我可以做这样的事情吗?:

spring.cloud.stream.bindings.input.binder=web
spring.cloud.stream.bindings.output.binder=kafka

我目前甚至尝试将 Function 拆分为 2:

  • 其中一个函数的输入绑定(bind)到 Web 客户端,其输出动态绑定(bind)到第二个函数(使用 spring.cloud.stream.sendto.destination)
  • 另一个函数,其输出绑定(bind)到 kafka 绑定(bind)。

但这种方法还是行不通。动态路由 (spring.cloud.stream.sendto.destination) 会显示在 Web 客户端上;但没有 Message 发送到 kafka 绑定(bind)本身。这是我在第二种方法(2 个函数)中使用的代码,希望简单地获得一个 Spring 功能应用程序,将其输入绑定(bind)到 Web 端点并将输出绑定(bind)到 kafka 主题。

WebToKafkaApp.java

@SpringBootApplication
public class WebToKafkaApp {
public static void main(String[] args) {
SpringApplication.run(WebToKafkaApp.class, args);
}

@Bean
public Function<String, Message<String>> webFunction() {
return payload -> createPayloadMapperToMessage("kafkaFunction").apply(payload);
}

@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> kafkaFunction() {
return flux -> flux.map(msg -> createPayloadMapperToMessage("").apply(msg.getPayload()));
}

private Function<String, Message<String>> createPayloadMapperToMessage(String destination) {
return payload -> MessageBuilder
.withPayload(payload.toUpperCase())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}

application.yml

spring.cloud.stream.bindings.webFunction-in-0:
destination: webFunctionIN
contentType: application/json
spring.cloud.stream.bindings.webFunction-out-0:
destination: webFunctionOUT
contentType: application/json
spring.cloud.stream.bindings.kafkaFunction-in-0:
destination: kafkaFunctionIN
contentType: application/json
binder: kafka
spring.cloud.stream.bindings.kafkaFunction-out-0:
destination: kafkaFunctionOUT
contentType: application/json
binder: kafka

spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092

spring.cloud.stream.function.routing.enabled: true
spring.cloud.function.definition: webFunction

build.gradle

plugins {
id 'org.springframework.boot' version '2.2.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.RELEASE")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
implementation 'org.springframework.cloud:spring-cloud-starter-function-webflux'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}

如有任何帮助,我们将不胜感激。

最佳答案

感谢Oleg用于发布the idea behind this solution 。本质上,我增强了他的建议,以一般性地处理以下之间的桥梁:

  1. 功能齐全的网络 Controller ;可以接收网络请求。
  2. 流媒体供应商;它可以将任何消息转发到消息传递基础设施。

此解决方案封装了 Oleg example 中描述的问题,在 Supplier 的自定义实现中。此类实现公开一个 API 来触发 Supplier 发出作为参数传递的消息。这样的类如下所示:

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.function.Supplier;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

public class StreamSupplier implements Supplier<Flux<?>> {

private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
"spring.cloud.stream.sendto.destination";

public static <T> Message<?> createMessage(T payload, String destination) {
MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
if (destination != null && !destination.isEmpty())
builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
return builder.build();
}

private String defaultDestination;
private EmitterProcessor<? super Object> processor = EmitterProcessor.create();

public StreamSupplier() {
this(null);
}

public StreamSupplier(String defaultDestination) {
this.defaultDestination = defaultDestination;
}

// SEND APIs

public <T> Message<?> sendMessage(T payload) {
return sendMessage(payload, defaultDestination);
}

public <T> Message<?> sendMessage(T payload, String destination) {
return sendBody(createMessage(payload, destination));
}

public <T> T sendBody(T body) {
processor.onNext(body);
return body;
}

/**
* Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
* the output binding associated with this {@link Supplier}. Such programmatic publications
* are available through the {@code sendXXX} API methods available in this class.
*/
@Override
public Flux<?> get() {
return processor;
}
}

那么开发人员只需:

  1. 将此特定 Supplier 实现的实例注册为 Spring 应用程序中的 bean;并让 spring-cloud-function 将此 bean 扫描到 FunctionCatalog 中。
  2. 创建 web function使用先前注册的供应商将任何消息转发到流基础设施 - 可以使用 spring-cloud-stream 的所有功能进行配置。

以下示例演示了这一点:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;

import java.util.function.Function;
import java.util.function.Supplier;

import reactor.core.publisher.Flux;

@SpringBootApplication
@Controller
public class MyApp {

public static void main(String[] args) {
SpringApplication.run(MyApp.class,
"--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
}

// Functional Web Controller
@Bean
public Function<String, String> webToStreamFunction() {
return msg -> streamSupplier().sendBody(msg);
}

// Functional Stream Supplier
@Bean
public Supplier<Flux<?>> streamSupplierFunction() {
return new StreamSupplier();
}

// DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
// LIMITATION OF SPRING-CLOUD-FUNCTION
@Bean
public StreamSupplier streamSupplier() {
return (StreamSupplier) streamSupplierFunction();
}
}

再次感谢Oleg提供 required details我一直在寻找构建这个全面的解决方案。

Complete code on GitHub

关于java - 如何配置 Spring Cloud Stream 中函数的绑定(bind),以将其输入绑定(bind)到 Web 端点并将其输出绑定(bind)到 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59099369/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com