gpt4 book ai didi

mysql - 如何配置 Debezium Mysql 连接器来生成原始键而不是 struct 或 json 对象?

转载 作者:行者123 更新时间:2023-11-29 16:13:05 34 4
gpt4 key购买 nike

我正在使用 Debezium 来检测 MySql 源表中的更改。如何生成 Kafka 消息,使键为数字(Long)值而不是 Json 对象?

我得到了什么:

key: {"foo_id": 123} 
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}

我想要什么:

key: 123
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}

我的 FOO 表如下所示:

foo_id: INT
bar: VARCHAR
baz: VARCHAR

请注意,我没有使用 avro,并且我已经尝试了以下几种组合(带和不带 key 转换器),但未能获得 Long key 。

"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",
"key.converter" : "org.apache.kafka.connect.converters.LongConverter",
"key.converter.schemas.enable": "false",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"

我不确定 ValueToKey 或 ExtractField 适用于 (MySQL) 源,但我低于 NPE。

Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

最佳答案

基于此找到了解决方案https://issues.jboss.org/browse/DBZ-689

{
...
"config": {
"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",
"key.converter" : "org.apache.kafka.connect.converters.IntegerConverter",
"key.converter.schemas.enable": "true",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"include.schema.changes": "false" <-- this was missing
}
}

现在,我将 foo_id 视为一个Integer(没什么大不了的,它不是Long):)

关于mysql - 如何配置 Debezium Mysql 连接器来生成原始键而不是 struct 或 json 对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55081274/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com