gpt4 book ai didi

sql-server - 如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafka

转载 作者:行者123 更新时间:2023-12-04 10:21:55 28 4
gpt4 key购买 nike

嗨,我尝试在 PostgreSQL 之间构建 Kafka 连接管道作为源到 SQL Server 作为目标。我使用了 3 个 Kafka broker,需要消费 252 个主题(一个主题与一张 PostgreSQL 表相同)。跑了一个多小时,252张 table 只能拉出218张。我发现的错误是 SQL Server 中存在死锁机制,它可以将事务保存到 SQL Server 并尝试重试,Debezium 复制槽也存在。

我在接收器上使用最多 3 个 worker 的分布式连接器,但也许这似乎还不够。还可以尝试使用更高的 offset.time_out.ms 到 60000 和更高的偏移分区 (100)。恐怕这不是我想要的生产水平。任何人都可以就此案提出建议吗?是否有任何计算来决定我需要的最佳 worker 数量?

更新

我得到了一些错误。我看到一些连接器被杀死了。
一告诉我 SQL SERVER 中发生死锁 :

[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

2020 年 4 月 14 日更新

我仍然有这个问题,我忘了告诉我如何部署连接器。现在我使用 2 个 worker ,一个用于源,一个用于接收器。我在 csv 中列出我的所有表和 pk 并循环遍历行以创建连接器而无需 sleep 或等待每分钟。我还为每个主题使用单个主题分区和 3 个副本。但我仍然有 sql server 连接死锁

最佳答案

问题可能是同时访问多个任务的同一个 SQL 表 并导致同步问题,如您提到的死锁。
由于您已经拥有大量主题,并且您的连接器可以并行访问它们,因此我建议您使用 将每个主题的分区数减少到 1 (Kafka 不支持减少分区数,因此您应该删除并使用新的分区数重新创建每个主题)。
这样,每个主题只有一个分区;每个分区只能在单个线程(/task/consumer)中访问,因此没有机会对同一个表进行并行 SQL 事务。

或者,更好的方法是 创建一个包含 3 个分区的主题 (与您拥有的任务/消费者数量相同)并制作 生产者使用 SQL 表名作为消息键 .
Kafka 保证具有相同键的消息总是转到同一个分区,因此具有相同表的所有消息将驻留在单个分区上(单线程消耗)。

如果您觉得它有用,我可以附上有关如何创建 Kafka Producer 和发送 key 消息的更多信息。

关于sql-server - 如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60814203/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com