gpt4 book ai didi

mysql - 通过 Apache-kafka 将删除事件从 MySQL 流式传输到 PostgreSQL

转载 作者:可可西里 更新时间:2023-11-01 07:38:28 25 4
gpt4 key购买 nike

我正在尝试使用 Apache Kafka 将事件从 MySQL 流式传输到 PostgreSQL。虽然插入和更新工作正常,但我无法确定了解如何从 MySQL 中删除记录并将此事件流式传输到 PostgreSQL

假设以下拓扑结构:

               +-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+

我正在使用以下 docker 镜像;

  1. Apache-Zookeper
  2. Apache-Kafka
  3. Debezium/JDBC connectors

然后

# Start the application
export DEBEZIUM_VERSION=0.6
docker-compose up

# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

这里是MySQL数据库的内容;

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

并且可以验证PostgresSQL的内容是一致的;

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)

假设我想从MySQL数据库中删除id=1004的记录;

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> delete from customers where id = 1004;


docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
+------+------------+-----------+-----------------------+

虽然这条记录从MySQL中被删除,但该条目仍然出现在PostgresSQL中

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'

last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)

我知道支持软删除,但是,是否也可以从 PostgresSQL 中完全删除该特定条目(通过 Apache-Kafka 从 MySQL 流式传输 del 事件)?

编辑:

这是source.json文件的内容

{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

这里是jdbc-sink.json文件的内容

{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}

我也尝试过设置 "pk.mode": "record_key""delete.enabled": "true" ( bug fix suggestion ) 但这个修改似乎不起作用。

最佳答案

Confluent JDBC 接收器连接器目前不支持删除。有一个待处理的拉取请求(您已经链接到它),但尚未合并。

目前,您可以自己构建基于该分支的 JDBC 接收器连接器,或者创建一个简单的自定义接收器连接器,该连接器仅通过在目标数据库上执行相应的 DELETE 语句来处理逻辑删除事件。

关于mysql - 通过 Apache-kafka 将删除事件从 MySQL 流式传输到 PostgreSQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47866065/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com