gpt4 book ai didi

kotlin - Spring Cloud Stream 会生成不必要的复杂 Kafka 拓扑,为什么?

转载 作者:行者123 更新时间:2023-12-02 13:21:15 25 4
gpt4 key购买 nike

我有一个 KStream 应用程序,其中包含一堆 KStream、连接和其他操作。我启用了logging.level.org.springframework.kafka.config=debug验证正在生成的拓扑,并发现很多根本没有意义的节点。

然后我将应用程序简化为:

interface ShippingKStreamProcessor {

@Input("input")
fun input(): KStream<Int, Customer>

}

@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {

@StreamListener
fun process(@Input("input") input: KStream<Int, Customer> {}

}

奇怪的是,这样一个简单的 KStream 声明会生成这个复杂的拓扑:
2019-04-30 23:47:03.881 DEBUG 2944 --- [           main] o.s.k.config.StreamsBuilderFactoryBean   : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> KSTREAM-BRANCH-0000000003, KSTREAM-PROCESSOR-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-BRANCH-0000000003 (stores: [])
--> KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-BRANCHCHILD-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000007
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-BRANCHCHILD-0000000005 (stores: [])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-MAPVALUES-0000000007 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000004
Processor: KSTREAM-PROCESSOR-0000000002 (stores: [])
--> none
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000005

enter image description here

原生 Kafka 应用程序中的相同简单流会产生更合乎逻辑的拓扑:
fun main(args: Array<String>) {

val builder = StreamsBuilder()

val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"

val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
)

//val byteArraySerde = Serdes.ByteArray()
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)

val customerStream = builder.stream<Int, Customer>("customer",
Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>

val topology = builder.build()
println(topology.describe())

val streams = KafkaStreams(topology, streamsConfiguration)
streams.start()
}

拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> none

enter image description here

Spring Cloud Stream 生成如此复杂的拓扑的原因是什么?

最佳答案

@codependent 在拓扑中拥有这些额外处理器的原因是因为您使用的是框架提供的 de/serailzers( native 解码和编码默认为 false )。基本上,我们从 Kafka 主题接收数据作为 byte[]然后在内部进行转换。对于这些转换,我们会经过一些额外的处理器,因此您最终会得到更深层次的拓扑。

这是一个基本的StreamListener在 Java 中(几乎是你所拥有的,但使用更简单的值类型):

@StreamListener
public void process(@Input("input") KStream<Integer, String> input ) {

}

通过活页夹中的标准开箱即用设置,我能够获得与您观察到的相同的更深层次的拓扑。但是,当我如下所示修改应用程序的配置时,
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true


我的拓扑减少如下:
2019-05-01 18:02:12.705 DEBUG 67539 --- [           main] o.s.k.config.StreamsBuilderFactoryBean   : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [hello-1])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000

这仍然与您从普通 Kafka Streams 应用程序中获得的拓扑不同,但事实证明这是我们可以在 binder 中改进以避免的情况。简而言之,通过切换到 Kafka Streams 提供的 native 解码和编码,您可以避免所有那些由 binder 构建的额外拓扑级别。

在某些情况下,您别无选择,只能依赖 Spring Cloud Stream 提供的反序列化,例如,您从基于 Spring Cloud Stream 的生产者接收数据,该生产者使用了一些特殊的序列化器。我认为在您的情况下确实如此,因为据我记得,您的生产者基于 Spring Cloud Stream 并且使用框架提供的 Avro 序列化程序。在这种情况下,使用 Kafka Stream 的 Avro Serde在您的处理器中将无法工作,因为这些序列化程序不兼容。所以这里有一些你的选择。

方法#1:
  • 让您的生产者使用 Kafka 提供的 native 序列化程序。
  • 然后使用在您的 Kafka Streams 应用程序中使用相同序列化器/反序列化器的 Serde。

  • 方法#2:
  • 使用 SCSt 提供的消息序列化程序。
  • 然后使用默认的 Kafka Streams binder 提供的默认反序列化。

  • #2的缺点显然是您在上面提到的,即更深的拓扑。根据您的用例和吞吐量,这可能没问题。如果这成为一个真正的性能问题,我们可以在框架完成转换时尝试简化这个过程。

    说了这么多,我创建了一个 issue在 Kafka binder 中对 binder 的下一个版本进行更改。欢迎您的反馈、建议、赞成/反对票。

    关于kotlin - Spring Cloud Stream 会生成不必要的复杂 Kafka 拓扑,为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55929162/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com