gpt4 book ai didi

java - 如何将此 spring-integration 配置从 XML 转换为 Java?

转载 作者:行者123 更新时间:2023-11-30 01:53:08 26 4
gpt4 key购买 nike

这个特定的部分在应用程序中实现而不是在 XML 中更有意义,因为它是整个集群中的常量,而不是本地化到单个作业。

通过剖析 XSD,在我看来,int-kafka:outbound-channel-adapter 的 xml 构造了一个 KafkaProducerMessageHandler。

没有可见的方式来设置 channel 、主题或大多数其他属性。

请注意潜在的反对者 - (咆哮)我已经使用 RTFM 一周了,比开始时更加困惑。我对语言的选择已经从形容词发展到副词,并且我开始从其他语言借用单词。答案或许就在里面。但即使是这样,也不是凡人能找到的。 (咆哮)

XML 配置:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="kafkaTemplate"
auto-startup="false"
channel="outbound-staging"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

如果是这样,那么我希望 java 配置看起来像这样:

@Bean
public KafkaProducerMessageHandler kafkaOutboundChannelAdapter () {
KafkaProducerMessageHandler result = new KafkaProducerMessageHandler(kafkaTemplate());

result.set????? (); // WTH?? No methods for most of the attributes?!!!

return result;
}

编辑:有关正在解决的高级问题的其他信息

作为一个更大项目的一部分,我正在尝试实现 https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning 中的教科书示例,使用 Kafka 支持而不是 JMS 支持。

我相信最终的集成流程应该是这样的:

partitionHandler -> messagesTemplate -> 出站请求 (DirectChannel) -> 出站暂存 (KafkaProducerMessageHandler) -> kafka

kafka ->executionContainer(KafkaMessageListenerContainer)->inboundKafkaRequests(KafkaMessageDrivenChannelAdapter)->入站请求(DirectChannel)->serviceActivator(StepExecutionRequestHandler)

serviceActivator (StepExecutionRequestHandler) -> 回复暂存 (KafkaProducerMessageHandler) -> kafka

kafka->replyContainer(KafkaMessageListenerContainer)->inboundKafkaReplies(KafkaMessageDrivenChannelAdapter)->inbound-replies(DirectChannel)->partitionhandler

最佳答案

不确定你的意思是它们被错过了,但这是我在 KafkaProducerMessageHandler 的源代码中看到的:

public void setTopicExpression(Expression topicExpression) {
this.topicExpression = topicExpression;
}

public void setMessageKeyExpression(Expression messageKeyExpression) {
this.messageKeyExpression = messageKeyExpression;
}

public void setPartitionIdExpression(Expression partitionIdExpression) {
this.partitionIdExpression = partitionIdExpression;
}

/**
* Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
* The resulting value should be a {@link Long} type representing epoch time in milliseconds.
* @param timestampExpression the {@link Expression} for timestamp to wait for result
* fo send operation.
* @since 2.3
*/
public void setTimestampExpression(Expression timestampExpression) {
this.timestampExpression = timestampExpression;
}

等等。

您还可以访问父类(super class) setter ,例如用于 XML 变体的 setSync()

input-channel 不是 MessageHandler 的职责。它会转到 Endpoint,并且可以通过 @ServiceActivator@Bean 一起进行配置。

请参阅核心 Spring 集成引用手册中的更多信息:https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans

开头还有很重要的一章:https://docs.spring.io/spring-integration/reference/html/#programming-tips

此外,最好考虑使用 Java DSL,而不是直接使用 MessageHandler:

             Kafka
.outboundChannelAdapter(producerFactory)
.sync(true)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 0)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic))
.get();

在提到的 Spring Integration 文档中查看有关 Java DSL 的更多信息:https://docs.spring.io/spring-integration/reference/html/#java-dsl

关于java - 如何将此 spring-integration 配置从 XML 转换为 Java?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55385449/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com