gpt4 book ai didi

java - 使用 Confluence 的 Spring Cloud Stream Kafka 不会生成与使用 Confluence 的 Spring Kafka 相同的消息

转载 作者:行者123 更新时间:2023-11-30 05:39:29 26 4
gpt4 key购买 nike

我想将 Spring Cloud Stream Kafka 用于我的 Java/Spring 服务,并且我需要生成 Confluence 序列化消息,因为我有 .NET 和 NodeJS 客户端,它们使用 Confluence API 来使用我的消息。

据我们所知,带有 Confluence 序列化器的 Spring Kafka 正在为我们工作,而带有 Confluence 序列化器的 Spring Cloud Stream Kafka 却给我们带来了问题。

为了演示我可以在这两种情况下看到差异,我在 GitHub 上创建了 2 个示例存储库,仅包含在这两种情况下生成简单消息所需的代码。

  1. 使用 Spring Kakfa 和 Confluence https://github.com/donalthurley/springKafkaAvro

  2. 使用 Spring Cloud Stream Kafka 和 Confluence https://github.com/donalthurley/springCloudKafkaAvro

我认为我已经使用 useNativeEncoding 标志和合流序列化器配置为 Spring Cloud 应用程序正确配置了配置设置,这些可以在此处的源代码中看到 https://github.com/donalthurley/springCloudKafkaAvro/blob/master/src/main/resources/application.yaml#L8

      kafka:
binder:
useNativeEncoding: true
brokers: 127.0.0.1:9092
bindings:
output:
producer:
configuration:
schema.registry.url: http://127.0.0.1:8081
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

日志显示,我从 Spring Kafka 应用程序和 Spring Cloud Stream Kafka 应用程序发送了相同的简单消息。

Producing Kafka person event: {"lastName": "Doe", "firstName": "John"}

当我在 docker Kafka 环境中使用 Kafka Topics UI 浏览器时,请参阅 https://hub.docker.com/r/landoop/fast-data-dev/ ,并查看消息原始数据,这两种情况是不同的。

对于 Spring Kafka 来说,它看起来更正确,因为浏览器可以识别并显示消息值内的字段。

[
{
"topic": "test_spring_kafka",
"key": "3197449393600061094",
"value": {
"lastName": "Doe",
"firstName": "John"
},
"partition": 0,
"offset": 0
}
]

在 Spring Cloud Stream Kafka 原始数据中,浏览器无法识别字段,这表明消息不相同。

[
{
"topic": "test_spring_cloud_kafka",
"key": "-6214497758709596999",
"value": "\u0006Doe\bJohn",
"partition": 0,
"offset": 0
}
]

我认为使用 Spring Cloud Stream Kafka 生成 Confluence 消息可能存在问题,并且 Spring Kafka 实现正在正确生成它们,但也许我在实现中遗漏了一些内容,有人可以帮助我解决这个问题?

最佳答案

问题出在您配置useNativeEncoding的方式上。它没有生效。此配置应该有效:

spring:
application:
name: springCloudKafkaAvro
cloud:
stream:
schemaRegistryClient:
endpoint: http://127.0.0.1:8081
kafka:
binder:
brokers: 127.0.0.1:9092
bindings:
output:
producer:
configuration:
schema.registry.url: http://127.0.0.1:8081
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
output:
destination: test_spring_cloud_kafka
producer:
useNativeEncoding: true

请注意 useNativeEncoding 是如何从原始配置中重新排列的。

关于java - 使用 Confluence 的 Spring Cloud Stream Kafka 不会生成与使用 Confluence 的 Spring Kafka 相同的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55951286/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com