- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Debezium 将上游数据库中的表同步到下游数据库,遵循 Debezium 博客中描述的方法 here .
在下游表中,我只需要上游表中的某些列。我还想更改一些列名称(包括主键名称)。 如果我不尝试重命名主键,同步工作不会出现任何问题。
我正在使用:
我在下面列出了我的数据库和连接器设置的完整详细信息。
(1)数据库表定义:
上游表的 DDL 是:
CREATE TABLE [kafkatest.service1].dbo.Users (
Id int IDENTITY(1,1) NOT NULL,
Name nvarchar COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
CONSTRAINT PK_Users PRIMARY KEY (Id)
) GO
下游表的DDL是:
CREATE TABLE [kafkatest.service2].dbo.Users (
LocalId int IDENTITY(1,1) NOT NULL, // added to avoid IDENTITY_INSERT issue with SQL Server
ExternalId int NOT NULL,
ExternalName nvarchar COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
CONSTRAINT PK_Users PRIMARY KEY (LocalId)
) GO
特别注意upstream表中的'Id'列(主键)应该映射到'ExternalId' 下游 表中的列。
(2) Kafka Connect连接器定义:
源连接器:
{
"name": "users-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.server.name": "sqlserver",
"database.hostname": "sqlserver",
"database.port": "1433",
"database.user": "sa",
"database.password": "Password!",
"database.dbname": "kafkatest.service1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.users",
"table.whitelist": "dbo.Users"
}
}
接收器连接器:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "sqlserver\\.dbo\\.(Users)",
"connection.url": "jdbc:sqlserver://sqlserver:1433;databaseName=kafkatest.service2",
"connection.user": "sa",
"connection.password": "Password!",
"transforms": "unwrap,route,RenameField",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(?:[^.]+)\\.(?:[^.]+)\\.([^.]+)",
"transforms.route.replacement": "$1",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "Id:ExternalId,Name:ExternalName",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "Id",
"pk.mode": "record_key"
}
}
据我所知,“pk.mode”需要是“record_key”才能启用删除。我已尝试将“pk.fields”值同时设置为“Id”和“ExternalId”,但均无效。
(3) 错误信息:
在第一种情况下(即“pk.fields”:“Id”)我得到以下错误:
2020-08-18 10:16:16,951 INFO || Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='Id', isPrimaryKey=true}] among column names [ExternalId, ExternalName, LocalId] [io.confluent.connect.jdbc.sink.DbStructure]
2020-08-18 10:16:16,952 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "Users" to add missing field SinkRecordField{schema=Schema{INT32}, name='Id', isPrimaryKey=true}, as the field is not optional and does not have a default value [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER TABLE "Users" to add missing field SinkRecordField{schema=Schema{INT32}, name='Id', isPrimaryKey=true}, as the field is not optional and does not have a default value
在第二种情况下(即“pk.fields”:“ExternalId”)我得到以下错误:
2020-08-18 10:17:50,192 ERROR || WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: PK mode for table 'Users' is RECORD_KEY with configured PK fields [ExternalId], but record key schema does not contain field: ExternalId [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'Users' is RECORD_KEY with configured PK fields [ExternalId], but record key schema does not contain field: ExternalId
使用 Debezium 时是否可以重命名主键?或者我是否总是需要构建我的数据库表,以便主键名称在上游和下游数据库中匹配?
最佳答案
尝试重命名关键字段:
"transforms": "unwrap,route,RenameField,RenameKey",
...
"transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameKey.renames": "Id:ExternalId",
当您使用 "pk.mode": "record_key"
时,主键 from the message key习惯build the upsert query statement .
关于apache-kafka - 使用Debezium和Kafka Connect JDBC sink connector同步数据库时如何重命名主键?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63466567/
我想为 python 3 使用 mysql-connector 库。我可以使用 pymysql 代替,但是 mysql-connector 已经有一个连接池实现,而 pymysql 似乎没有。所以我要
哈,我正在尝试在 asp.net 4 中将 CKFinder 与 ckeditor 一起使用,但出现此错误: Could not load type 'CKFinder.Connector.Conne
哈,我正在尝试在 asp.net 4 中将 CKFinder 与 ckeditor 一起使用,但出现此错误: Could not load type 'CKFinder.Connector.Conne
SAP Java 连接器 是否仍然是将 Java 应用程序与 SAP 连接的好方法?将来(尤其是在 ECC 6.0 中)是否会有连接器的支持和维护,或者是使用 ECC 6.0 中的“企业服务”的唯一好
我一直在使用 AzureLogicApp 中的 FTP 连接器将 FTP 服务器中的文件从源文件夹解压缩到目标文件夹。 我已将 FTP 连接器配置为在源文件夹中添加文件时触发。 我面临的问题是此处触发
我一直在使用 AzureLogicApp 中的 FTP 连接器将 FTP 服务器中的文件从源文件夹解压缩到目标文件夹。 我已将 FTP 连接器配置为在源文件夹中添加文件时触发。 我面临的问题是此处触发
1) sudo dpkg -i mysql-connector-python_1.0.12-1ubuntu12.04_all.deb 2) sudo dpkg -i mysql-utilities_1
导入 mysql.connector ModuleNotFoundError:没有名为“mysql.connector”的模块; 'mysql' 不是一个包 pip install mysql-con
我正在为我们的一位客户开发 C# 应用程序。我们已经定义了一些 IDOC 结构。所有规范都在 Excel 表格中。 所以问题是:是否可以从“IDOCTYPE_READ_COMPLETE”函数获取整个
我有这两个表:gantt_tasks 和 gantt_links,我成功将任务添加到第一个表,但如果我尝试添加链接,则会出现此错误。 java.lang.ClassCastException: com
我正在尝试在远程 tomcat 服务器(8.5.39)上部署我的 java 后台。为了使用 https,我在/conf/server.xml 上更改了这些行 我可以在远程服务器上运行t
当我安装mysql-connector-python 2.0.1-1时( http://dev.mysql.com/downloads/connector/python/ ) 在 Ubuntu 14.
为了在 SFTP 写入连接器中获取文件名,我已将文件名存储在变量中并写入文件(暂存),然后将其写入目录。从不同的流中,我需要将相同的文件移动到输出位置,两个流不是相互链接的。 Mule4 中没有 Se
我在 virtualbox 中使用 vagrant。将数据插入数据库时出现此错误。我尝试修复错误的事情: 清除所有缓存、路由和配置 确保我的 env 文件已配置(我已经尝试将 127.0.0.1
我正在准备第一次使用jdbc,并且正在为MySQL安装jdbc驱动程序。 但是,我不清楚将这些文件中的哪些移动到 Eclipse 中的 WEB_INF/lib 文件夹中。它们似乎都包含相同的内容,并一
我使用的是Eclipse Helios v3.6,每次启动时,都会显示以下对话框。但是我不使用颠覆。有人知道如何停止吗? 最佳答案 我遇到过同样的问题。要修复它,我进入了Eclipse插件目录,并搜索
我已经在mongodb服务器中安装了mongo-connector。 我通过发出命令来执行 mongo-connector -m [remote mongo server IP]:[remote mo
我使用的是 gnu/linux 系统,特别是 Fedora 21 64 位。我想通过终端系统启动我的arduino IDE,突然,这个错误出现了: Could not find agent libra
Mysql-connector-java驱动版本问题 由于我的数据库版本是5.7.28 ,在使用java连接mysql时经常出现版本问题。 com.mysql.jdbc.Driver 是
当前问题集: 包含 mysql_connector 对象的 Python 应用程序 只能使用 mysql 访问远程服务器(不能使用 ssh、rsh、telnet 等) 目标: 使用 mysql 连接器
我是一名优秀的程序员,十分优秀!