gpt4 book ai didi

apache-kafka - Debezium 的 ExtractNewRecordState 转换无法工作

转载 作者:行者123 更新时间:2023-12-04 10:03:47 25 4
gpt4 key购买 nike

我正在构建一个数据同步器,它捕获来自 MySQL 源的数据更改,并将数据导出到配置单元。

我选择使用 Kafka Connect 来实现这一点。我使用 Debezium 作为源连接器,使用融合的 hdfs 作为接收器连接器。

Debezium 提供了一个 single message transformation让我提取 after来自复杂事件消息的字段。我按照列出的文档进行了相同的配置,但是没有用。

{
// omit ...
"transform": "unwrap",
"transform.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}

我已经尝试在源连接器端和接收器连接器端配置转换,它仍然无法工作。事实上,当我在我的源连接器端配置它,然后查看相应主题中的消息时,我发现消息仍然包含所有字段,包括 before , source , 等等。
ythh@openstack2:~/confluent-5.5.0$ bin/kafka-avro-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic dbserver1.test_data_1.student3
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":1,"name":"ggg"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005572000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":9474,"row":0,"thread":{"long":6013},"query":null},"op":"c","ts_ms":{"long":1589005572172},"transaction":null}
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":2,"name":"no way"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005893000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":11218,"row":0,"thread":{"long":6030},"query":null},"op":"c","ts_ms":{"long":1589005893773},"transaction":null}
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":3,"name":"not work"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005900000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":11501,"row":0,"thread":{"long":6030},"query":null},"op":"c","ts_ms":{"long":1589005900724},"transaction":null}

我还检查了 kafka 连接日志,这是一些输出:
ythh@openstack2:~/kafka_2.12-2.5.0/logs$ cat connect.log | grep transform
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
[2020-05-09 14:29:30,470] INFO transform.unwrap.type = io.debezium.transforms.ExtractNewRecordState (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,470] INFO transform = unwrap (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,471] INFO transform.unwrap.drop.tombstones = false (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,471] INFO transform.unwrap.delete.handling.mode = rewrite (io.debezium.connector.common.BaseSourceTask:97)
transforms = []
transforms = []
[2020-05-09 14:29:32,419] INFO transform.unwrap.type = io.debezium.transforms.ExtractNewRecordState (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:32,419] INFO transform = unwrap (io.debezium.connector.common.BaseSourceTask:97)
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []
transforms = []

最佳答案

看起来你打错了( transform 而不是 transforms )。试试这个配置:

{
// omit ...
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}

关于apache-kafka - Debezium 的 ExtractNewRecordState 转换无法工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61692900/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com