gpt4 book ai didi

spring-boot - 如何使用 Spring Cloud Stream Supplier 向 Kafka 发送键控消息

转载 作者:行者123 更新时间:2023-12-05 03:55:20 24 4
gpt4 key购买 nike

我想使用 Spring Cloud Stream 向 Kafka 生成键控(具有特定键的消息)消息。

@SpringBootApplication
public class SpringCloudStreamKafkaApplication {

public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaApplication.class, args);
}

@Bean
Supplier<DataRecord> process(){
return () -> new DataRecord(42L);
}

}

我需要在供应商代码中更改什么以提供 key ?是否可以使用新的 API 样式(使用 lambda)?

谢谢

最佳答案

返回 Message<?>并设置 KafkaHeaders.MESSAGE_KEY header :

@Bean
Supplier<Message<String>> process() {
return () -> MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
}

(假设默认的 key 序列化器(byte[])。

编辑

这将被无休止地调用。

如果你想发送有限流,我相信你必须切换到 react 模型。

@Bean
Supplier<Flux<Message<String>>> processFinite() {
Message<String> msg1 = MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes())
.build();
Message<String> msg2 = MessageBuilder.withPayload("baz")
.setHeader(KafkaHeaders.MESSAGE_KEY, "qux".getBytes())
.build();
return () -> {
return Flux.just(msg1, msg2);
};
}

还有Flux.fromStream(myStream) .

它将在流的末尾结束。

EDIT2

您还可以使用 StreamBridge .

https://docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

关于spring-boot - 如何使用 Spring Cloud Stream Supplier 向 Kafka 发送键控消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60229967/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com