gpt4 book ai didi

apache-kafka - 如何将单个消息转换与 Kafka Connect JDBC Source Connector 和多个表一起使用?

转载 作者:行者123 更新时间:2023-12-02 19:56:00 25 4
gpt4 key购买 nike

我想在使用 Kafka Connect Source JDBC 连接器导入表时设置消息键。

当定义多个表从 JDBC 连接器读取时,Kafka Connect/Source 中的单消息转换 (SMT) 如何定位到正确的字段? SMT 需要一个列名,当有多个表时该列名可能会有所不同。

我没有找到根据表名称或类似名称过滤 SMT 定义的方法。下面的代码示例工作正常,因为它只有一张表。

但是如果您有不同的表该怎么办,例如用户、订单、产品?

"table.whitelist" : "User"
"transforms":"createKey,extract",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"user_id",
"transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField\$Key",
"transforms.extract.field":"user_id",

当具有该配置的工作任务遇到没有该 user_id 字段的表时,它会崩溃并保持 FAILED 状态

org.apache.kafka.connect.errors.ConnectException: 
Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more

这是合理的,因为不可能通过表格或目标光学器件来定义,不是吗?我希望能够将转换限制为给定的表或主题,例如类似的东西

transforms.<topic-name>.createKey.type

我是否遗漏了什么或者是连接限制?

最佳答案

不可能仅将 SMT 应用于特定主题,因为这是连接器级别配置,意味着它应用于每条已处理的消息。

我建议您为每个主题创建不同的连接器,以便您可以仅将 SMT 应用于主题的子集。

关于apache-kafka - 如何将单个消息转换与 Kafka Connect JDBC Source Connector 和多个表一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57019861/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com