gpt4 book ai didi

apache-kafka - Kafka-confluent : How to use pk. mode=record_key 用于 JDBC 接收器连接器中的 upsert 和删除模式?

转载 作者:行者123 更新时间:2023-12-05 00:46:22 25 4
gpt4 key购买 nike

在 Kafka confluent 中,我们如何在使用 pk.mode=record_key 作为 MySQL 表中的复合键的同时使用源文件作为 CSV 文件使用 upsert? upsert 模式在使用 pk.mode=record_values 时工作。是否需要进行其他配置?

如果我尝试使用 pk.mode=record_key,我会收到此错误。错误 - 原因:org.apache.kafka.connect.errors.ConnectException:需要准确定义一个 PK 列,因为记录的键模式是原始类型。以下是我的 JDBC 接收器连接器配置:

    {
"name": "<name>",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "<topic name>",
"connection.url": "<url>",
"connection.user": "<user name>",
"connection.password": "*******",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "<table name>",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}

最佳答案

您需要使用record.valuepk.mode。这意味着从消息的 value 中获取字段并将它们用作目标表中的主键并用于 UPSERT 目的。

如果您设置 record.key,它将尝试从 Kafka 消息 key 中获取关键字段。除非您确实在消息键中获得了值,否则这不是您要使用的设置。

这些可能会进一步帮助您:

关于apache-kafka - Kafka-confluent : How to use pk. mode=record_key 用于 JDBC 接收器连接器中的 upsert 和删除模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61079658/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com