gpt4 book ai didi

mysql - 如何在kafka jdbc连接源中过滤数据库中的表

转载 作者:行者123 更新时间:2023-11-29 16:16:23 41 4
gpt4 key购买 nike

我在 Confluence 社区平台中使用 Kafka Connect 来保持 MySQL 数据库同步。源和接收器是 MySQL 数据库。它不起作用。

我的情况存在一些问题:

  1. 同一服务器中的其他数据库中有表,我不想将它们读入 Kafka,但 Kafka Connect Source 不断尝试读取其他数据库。

  2. 我想在 Source Connector 和 Sink Connector 中使用 org.apache.kafka.connect.json.JsonConverter,但 Sink Connector 无法正确插入。

  3. 我想要同步多个数据库,不同数据库中的表可能具有相同的表名,如何避免表名冲突和接收器连接器正确路由 Kafka 主题以将数据插入正确的数据库? MySQL Synchronization illustration

Kafka JDBC Source Connector 配置文件是:

{
"name": "br-auths-3910472223-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",

"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/br_auths?user=root&password=123456",
"database.whitelist":"br_auths",
"table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",

"mode": "timestamp",
"timestamp.column.name": "utime",
"validate.non.null": "false",

"incrementing.column.name": "id",
"topic.prefix": "br_auths__"
}
}

Kafka JDBC Sink Connector 配置文件是:

{
"name": "br-auths-3910472223-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",

"tasks.max": "1",
"connection.url": "jdbc:mysql://rm-hp303a0n2vr8970.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?user=br_auths&password=@123456",

"topics": "br_auths__auths_roles,br_auths__auths_user_logins,br_auths__auths_user_roles,br_auths__auths_users,br_auths__auths_user_claims,br_auths__auths_user_tokens,br_auths__auths_role_claims",

"auto.create": "true",
"insert.mode": "upsert",

"transforms":"dropTopicPrefix",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"br_auths__(.*)",
"transforms.dropTopicPrefix.replacement":"$1"
}
}

我想为不同的数据库创建多对源连接器和接收器连接器,MySQL服务器A中的数据库A中的一些白名单表可以与MySQL服务器B中的数据库A增量同步。

更新1:

我更改为 connect-avro-distributed、Debezium Source Connector 和 JDBC Sink Connector。源连接器是:

{
"name":"br-auths-3910472223-source",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "br123456",
"database.useLegacyDatetimeCode": "false",
"database.server.id": "184",
"database.server.name": "local3910472223",
"database.whitelist":"br_auths",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.br-auths.local3910472223" ,
"table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",
"include.schema.changes": "true",
"transforms": "route,TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "string",
"transforms.TimestampConverter.field": "payload.after.ctime",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2__$3"
}
}

接收器连接器是:

{
"name": "br-auths-3910472223-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://rm-hp303a0n2.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?useLegacyDatetimeCode=false&user=br_auths&password=123456",
"dialect.name": "MySqlDatabaseDialect",
"topics.regex": "br_auths__(.*)",
"transforms": "dropTopicPrefix,unwrap",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"br_auths__(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"insert.mode": "upsert",
"pk.fields": "Id",
"pk.mode": "record_value"
}
}

Avro 消息转换为 json,如下所示:

{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "Id"
},
{
"type": "string",
"optional": false,
"field": "UserId"
},
{
"type": "string",
"optional": false,
"field": "RoleId"
},
{
"type": "string",
"optional": true,
"field": "APPID"
},
{
"type": "int32",
"optional": false,
"default": 0,
"field": "IsDeleted"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "ctime"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "utime"
}
],
"optional": true,
"name": "local3910472223.br_auths.auths_user_roles.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "Id"
},
{
"type": "string",
"optional": false,
"field": "UserId"
},
{
"type": "string",
"optional": false,
"field": "RoleId"
},
{
"type": "string",
"optional": true,
"field": "APPID"
},
{
"type": "int32",
"optional": false,
"default": 0,
"field": "IsDeleted"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "ctime"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "utime"
}
],
"optional": true,
"name": "local3910472223.br_auths.auths_user_roles.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "local3910472223.br_auths.auths_user_roles.Envelope"
},
"payload": {
"before": null,
"after": {
"Id": "DB4DA841364860D112C3C76BDCB36635",
"UserId": "0000000000",
"RoleId": "5b7e5f9b4bc00d89c4cf96ae",
"APPID": "br.region2",
"IsDeleted": 0,
"ctime": 1550138524000,
"utime": 1550138524000
},
"source": {
"version": "0.8.3.Final",
"name": "local3910472223",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 64606,
"row": 0,
"snapshot": true,
"thread": null,
"db": "br_auths",
"table": "auths_user_roles",
"query": null
},
"op": "c",
"ts_ms": 1550568556614
}
}

使用 MySQL 日期时间类型的列被序列化为大整数,JDBC 接收器连接器尝试插入 MySQL 日期时间列,但失败。

所以我在源连接配置中编写了transforms.TimestampConverter,但ctime、utime列没有改变。怎么了?

最佳答案

  1. 如果您希望保持数据库同步,则 JDBC 源连接器不是最好的 - 您希望使用适当的基于日志的 CDC,对于 MySQL,您可以通过 Debezium 获得该连接器。更多详情here .
  2. 如果您不对数据做任何其他事情,您还需要 Kafka 吗?专用的 MySQL 复制工具会更合适吗?
  3. 针对您的具体问题。 This article将解决您的很多问题。特别是:

    1. There are tables in other databases in the same server, and i don't want to read them into Kafka, but Kafka Connect Source keep trying to read other databases.

      根据需要使用table.whitelisttable.blacklistschema.pattern的组合。如果您无法使用一个连接器匹配整个模式,则需要使用多个连接器来实​​现所需的设置。

    2. I want to use org.apache.kafka.connect.json.JsonConverter in both Source Connector and Sink Connector, but sink connectors couldn't insert correctly.

      如果没有您解释“无法正确插入”,就很难回答这个问题。一般来说,我会使用 Avro,因为它有更丰富的模式支持和更高效的消息(没有嵌入模式,模式存储在模式注册表中)。请参阅here了解更多详情。

    3. I want to synchronize several databases, tables in different databases may be with same table names, how to avoid table names conflict and sink connectors route the Kafka topics correctly to insert data into the right databases?

      您将需要在源连接器上使用 topic.prefix 组合来标记来自特定源的主题,然后使用单消息转换 RegexRouter (如您所愿)我们已经发现)可以在源连接器和/或接收器连接器中进一步操作主题名称。您可能需要多个接收器连接器,使用 topics.regex 来选择特定主题以路由到特定数据库。

关于mysql - 如何在kafka jdbc连接源中过滤数据库中的表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54760192/

41 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com