gpt4 book ai didi

apache-kafka - Debezium 无法快照大表大小

转载 作者:行者123 更新时间:2023-12-02 14:50:47 31 4
gpt4 key购买 nike

我想我可能遗漏了一些配置,但我们正在尝试使用 Debezium 对一个表中的所有行进行快照,该表有大约 800 万条记录,一段时间后它停止了。

连接器配置是:

{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.user":"MyUser",
"database.server.id":"12345",
"tasks.max":"1",
"database.history.kafka.bootstrap.servers":"MyKafka:9092",
"database.history.kafka.topic":"MyConnectorHistory",
"database.server.name":"MyDbName",
"database.port":"3306",
"table.whitelist":"BigTable",
"decimal.handling.mode":"double",
"database.hostname":"***",
"database.password":"***",
"name":"MyConnector",
"database.whitelist":"MyDb",
"snapshot.mode":"initial_only",
"connect.timeout.ms":"60000"
}

连接器开始扫描行:

  April 24th 2019, 13:06:52.573 2019-04-24 16:06:52,569 INFO   MySQL|MyDbName|snapshot  Step 9: - 2040000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:59:29.129   [io.debezium.connector.mysql.SnapshotReader]
... other prints
April 24th 2019, 12:17:28.448 2019-04-24 15:17:28,447 INFO MySQL|MyDbName|snapshot Step 9: - 50000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:10:05.008 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:43.183 2019-04-24 15:07:43,183 INFO MySQL|MyDbName|snapshot Step 9: - 40000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:19.744 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:36.499 2019-04-24 15:07:36,498 INFO MySQL|MyDbName|snapshot Step 9: - 30000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:13.059 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:30.157 2019-04-24 15:07:30,157 INFO MySQL|MyDbName|snapshot Step 9: - 20000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:06.718 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:25.116 2019-04-24 15:07:25,116 INFO MySQL|MyDbName|snapshot Step 9: - 10000 of 8609643 rows scanned from table 'MyDb.BigTable' after 00:00:01.677 [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.439 2019-04-24 15:07:23,439 INFO MySQL|MyDbName|snapshot Step 9: - scanning table 'MyDb.BigTable' (1 of 10 tables) [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.427 2019-04-24 15:07:23,427 INFO MySQL|MyDbName|snapshot Step 8: tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables. [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.427 2019-04-24 15:07:23,427 INFO MySQL|MyDbName|snapshot Step 9: scanning contents of 10 tables while still in transaction [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.143 2019-04-24 15:07:23,143 INFO MySQL|MyDbName|snapshot Step 7: generating DROP and CREATE statements to reflect current database schemas: [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:23.142 2019-04-24 15:07:23,142 INFO MySQL|MyDbName|snapshot Step 6: read binlog position of MySQL master [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.739 2019-04-24 15:07:22,739 INFO MySQL|MyDbName|snapshot Step 5: flush and obtain read lock for 10 tables (preventing writes) [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.635 2019-04-24 15:07:22,635 INFO MySQL|MyDbName|snapshot Step 4: read list of available tables in each database [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.633 2019-04-24 15:07:22,633 INFO MySQL|MyDbName|snapshot Step 3: read list of available databases [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.632 2019-04-24 15:07:22,632 INFO MySQL|MyDbName|snapshot Step 2: start transaction with consistent snapshot [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.632 2019-04-24 15:07:22,631 INFO MySQL|MyDbName|snapshot Step 1: unable to flush and acquire global read lock, will use table read locks after reading table names [io.debezium.connector.mysql.SnapshotReader]
April 24th 2019, 12:07:22.617 2019-04-24 15:07:22,617 INFO MySQL|MyDbName|snapshot Step 1: flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.SnapshotReader]

一段时间后,我们得到

    Failed to flush, timed out while waiting for producer to flush outstanding 4094 messages
Failed to commit offsets [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]

然后,扫描停止,我们有几次尝试再次刷新提交偏移量:

  April 24th 2019, 12:34:08.641 2019-04-24 15:34:08,641 ERROR  ||  WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets   [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
April 24th 2019, 12:34:08.640 2019-04-24 15:34:08,640 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 5560 messages [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:33:18.640 2019-04-24 15:33:18,640 INFO || WorkerSourceTask{id=MyConnectorr-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:33:18.640 2019-04-24 15:33:18,640 INFO || WorkerSourceTask{id=MyConnectorr-0} flushing 5560 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:32:18.640 2019-04-24 15:32:18,640 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
April 24th 2019, 12:32:18.639 2019-04-24 15:32:18,639 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 5560 messages [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:31:28.639 2019-04-24 15:31:28,639 INFO || WorkerSourceTask{id=MyConnectorr-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:31:28.639 2019-04-24 15:31:28,639 INFO || WorkerSourceTask{id=MyConnectorr-0} flushing 5560 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:30:28.639 2019-04-24 15:30:28,639 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to commit offsets [org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter]
April 24th 2019, 12:30:28.636 2019-04-24 15:30:28,635 ERROR || WorkerSourceTask{id=MyConnectorr-0} Failed to flush, timed out while waiting for producer to flush outstanding 652 messages [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:29:38.635 2019-04-24 15:29:38,635 INFO || WorkerSourceTask{id=MyConnectorr-0} flushing 5556 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
April 24th 2019, 12:29:38.635 2019-04-24 15:29:38,635 INFO || WorkerSourceTask{id=MyConnectorr-0} Committing offsets

一段时间后(大约9~10分钟)似乎成功了,开始再次扫描行。但是一段时间后它再次失败然后,连接器没有完成所有记录就将其状态更改为 FAIL

其中一个错误是

{
"name":"MyConnector",
"connector":{
"state":"RUNNING",
"worker_id":"svc.cluster.local:8083"
},
"tasks":[
{
"state":"FAILED",
"trace":"org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing\n\tat org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:318)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:197)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
"id":0,
"worker_id":"svc.cluster.local:8083"
}
],
"type":"source"
}

我读了这个问题:https://github.com/confluentinc/kafka-connect-jdbc/issues/161并尝试按照建议更改参数值。它更好,但一段时间后它仍然失败:现在,我的连接配置是:

OFFSET_FLUSH_INTERVAL_MS: 60000
OFFSET_FLUSH_TIMEOUT_MS: 50000
CONNECT_PRODUCER_BUFFER_MEMORY: 45554432

我还尝试了此处描述的这些值:Debezium flush timeout and OutOfMemoryError errors with MySQL

我还没有尝试过的一件事是使用 snapshot.select.statement.overrides 参数。但我不确定它是否会有所帮助,因为有时提交偏移量问题会发生在 100k 条消息时。我将不得不多次恢复和停止连接器。

最佳答案

我使用 Debezium 对具有超过 3000 万条记录的多个表的 MySQL 数据库进行快照。不过,我们有一个拥有超过 1 亿条记录的。对于那个,我使用了 select 语句覆盖配置(因为它是一个 INSERT only 表)。

最初,使用默认设置对数据库进行快照,我遇到了与您面临的问题完全相同的问题。我调整了以下配置,它帮助解决了我的问题。

在 worker.properties 配置文件中设置的 kafka connect worker 配置:

offset.flush.timeout.ms=60000
offset.flush.interval.ms=10000
max.request.size=10485760

减少偏移量刷新间隔允许 Kafka 连接更频繁地刷新偏移量,并且设置较大的超时时间可以让它有更多时间获得提交确认。

Debezium 配置通过 curl 请求传递以对其进行初始化:

max.queue.size = 81290
max.batch.size = 20480

队列的默认大小是 8192,这对于较大的数据库来说是相当低的。提高这些配置有很大帮助。

希望对您的问题有所帮助

关于apache-kafka - Debezium 无法快照大表大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55839310/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com