gpt4 book ai didi

apache-kafka - 如何更改Kafka Connect Source Connector生成的主题名称

转载 作者:行者123 更新时间:2023-12-03 16:35:15 25 4
gpt4 key购买 nike

我有一个已经在运行的生产部署了 Kafka-Cluster 并且有主题“现有主题”。我正在使用 Debezium 的 MongoDB-Source-Connector。

在这里,我想要的只是将 CDC 事件直接推送到主题“现有主题”,以便我的已经在收听该主题的消费者能够处理它。

我没有找到任何资源可以这样做,但是有人提到该主题是以以下格式创建的 -

“如果你的mongodb.name参数是A,数据库名称是B,集合名称是C,那么数据库A和集合C的数据将被加载到主题A.B.C下”

我可以将主题更改为“现有主题”并将事件推送给它吗?

最佳答案

根据documentation ,

The name of the Kafka topics always takes the form logicalName.databaseName.collectionName, where logicalName is the logical name of the connector as specified with the mongodb.name configuration property, databaseName is the name of the database where the operation occurred, and collectionName is the name of the MongoDB collection in which the affected document existed.



这意味着如果您的连接器的逻辑名称是 myConnector和您的数据库 myDatabase有两个收藏 usersorders
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}

然后 Kafka Connect 将使用名称填充两个主题:
  • myConnector.myDatabase.users
  • myConnector.myDatabase.orders


  • 现在,如果您仍想更改目标主题的名称,则可以使用 Kafka Connect Single Message Transforms (SMT)。更准确地说, ExtractTopic 应该可以帮助你。请注意,此 SMT 可帮助您从消息的键或值中提取主题名称,因此您需要以某种方式在有效负载中包含所需的主题名称。

    例如,以下 SMT 将提取字段 myField 的值并将其用作记录的主题:
     transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
    transforms.ValueFieldExample.field=myField

    关于apache-kafka - 如何更改Kafka Connect Source Connector生成的主题名称,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61847570/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com